diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fed2574..18cd941 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -258,6 +258,16 @@ RM_PREFIX + "nodemanagers.heartbeat-interval-ms"; public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000; + /** The setting that controls whether RM writes history data. */ + public static final String RM_AHS_WRITER_ENABLED = RM_PREFIX + + "ahs.writer.enabled"; + public static final boolean DEFAULT_RM_AHS_WRITER_ENABLED = false; + + /** Number of worker threads that write the history data. */ + public static final String RM_AHS_WRITER_COMPSITE_DISPATCHER_SIZE = RM_PREFIX + + "ahs.writer.dispatcher-vector-size"; + public static final int DEFAULT_RM_AHS_WRITER_COMPSITE_DISPATCHER_SIZE = 10; + //Delegation token related keys public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY = RM_PREFIX + "delegation.key.update-interval"; @@ -862,6 +872,10 @@ public static final String AHS_PREFIX = YARN_PREFIX + "ahs."; + /** The implementation class of ApplicationHistoryStore, which is to be used + * by RMApplicationHistoryWriter and ApplicationHistoryServer. */ + public static final String AHS_STORE_CLASS = AHS_PREFIX + "store.class"; + /** URI for FileSystemApplicationHistoryStore */ public static final String FS_HISTORY_STORE_URI = AHS_PREFIX + "fs-history-store.uri"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index b88a9c4..600e476 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -420,6 +420,19 @@ org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy + + Enable RM to write history data. If true, then + yarn.resourcemanager.ahs.writer.class must be specified + yarn.resourcemanager.ahs.writer.enabled + false + + + + Number of worker threads that write the history data. + yarn.resourcemanager.ahs.writer.dispatcher-vector-size + 10 + + The hostname of the NM. @@ -881,10 +894,18 @@ + The implementation class of ApplicationHistoryStore, which is + to be used by RMApplicationHistoryWriter and ApplicationHistoryServer. + + yarn.ahs.store.class + org.apache.hadoop.yarn.server.applicationhistoryservice.DummyApplicationHistoryStore + + + URI pointing to the location of the FileSystem path where the history will be persisted. This must be supplied when using org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore - as the value for yarn.resourcemanager.ahs.writer.class + as the value for yarn.ahs.store.class yarn.ahs.fs-history-store.uri ${hadoop.log.dir}/yarn/system/ahstore diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml index 33350b4..a6d8c1f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml @@ -26,9 +26,9 @@ 4.0.0 org.apache.hadoop - hadoop-yarn-server-applicationhistoryserver + hadoop-yarn-server-applicationhistoryservice 2.3.0-SNAPSHOT - hadoop-yarn-server-applicationhistoryserver + hadoop-yarn-server-applicationhistoryservice diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/DummyApplicationHistoryStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/DummyApplicationHistoryStore.java new file mode 100644 index 0000000..e3d0a03 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/DummyApplicationHistoryStore.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; + +/** + * Dummy implementation of {@link ApplicationHistoryStore}. If this + * implementation is used, no history data will be persisted. + * + */ +@Unstable +@Private +public class DummyApplicationHistoryStore extends AbstractService implements + ApplicationHistoryStore { + + public DummyApplicationHistoryStore() { + super(DummyApplicationHistoryStore.class.getName()); + } + + @Override + public void applicationStarted(ApplicationStartData appStart) + throws IOException { + } + + @Override + public void applicationFinished(ApplicationFinishData appFinish) + throws IOException { + } + + @Override + public void applicationAttemptStarted( + ApplicationAttemptStartData appAttemptStart) throws IOException { + } + + @Override + public void applicationAttemptFinished( + ApplicationAttemptFinishData appAttemptFinish) throws IOException { + } + + @Override + public void containerStarted(ContainerStartData containerStart) + throws IOException { + } + + @Override + public void containerFinished(ContainerFinishData containerFinish) + throws IOException { + } + + @Override + public ApplicationHistoryData getApplication(ApplicationId appId) + throws IOException { + return null; + } + + @Override + public Map getAllApplications() + throws IOException { + return Collections.emptyMap(); + } + + @Override + public Map + getApplicationAttempts(ApplicationId appId) throws IOException { + return Collections.emptyMap(); + } + + @Override + public ApplicationAttemptHistoryData getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException { + return null; + } + + @Override + public ContainerHistoryData getContainer(ContainerId containerId) + throws IOException { + return null; + } + + @Override + public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException { + return null; + } + + @Override + public Map getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + return Collections.emptyMap(); + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ApplicationStartDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ApplicationStartDataPBImpl.java index b08023e..6629380 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ApplicationStartDataPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/impl/pb/ApplicationStartDataPBImpl.java @@ -148,7 +148,7 @@ public void setQueue(String queue) { @Override public long getSubmitTime() { ApplicationStartDataProtoOrBuilder p = viaProto ? proto : builder; - return p.getStartTime(); + return p.getSubmitTime(); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index 6cf26f1..f084215 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -39,6 +39,11 @@ org.apache.hadoop + hadoop-yarn-server-applicationhistoryservice + 2.3.0-SNAPSHOT + + + org.apache.hadoop hadoop-yarn-server-web-proxy diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 28101cc..776685b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; @@ -31,8 +32,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; /** @@ -74,4 +75,10 @@ void setRMDelegationTokenSecretManager( RMDelegationTokenSecretManager delegationTokenSecretManager); + + RMApplicationHistoryWriter getRMApplicationHistoryWriter(); + + void setRMApplicationHistoryWriter( + RMApplicationHistoryWriter rmApplicationHistoryWriter); + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index d2592ed..1e894f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -34,8 +35,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import com.google.common.annotations.VisibleForTesting; @@ -44,14 +45,14 @@ private final Dispatcher rmDispatcher; - private final ConcurrentMap applications - = new ConcurrentHashMap(); + private final ConcurrentMap applications = + new ConcurrentHashMap(); + + private final ConcurrentMap nodes = + new ConcurrentHashMap(); - private final ConcurrentMap nodes - = new ConcurrentHashMap(); - - private final ConcurrentMap inactiveNodes - = new ConcurrentHashMap(); + private final ConcurrentMap inactiveNodes = + new ConcurrentHashMap(); private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amFinishingMonitor; @@ -64,6 +65,7 @@ private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; private ClientRMService clientRMService; private RMDelegationTokenSecretManager rmDelegationTokenSecretManager; + private RMApplicationHistoryWriter rmApplicationHistoryWriter; public RMContextImpl(Dispatcher rmDispatcher, RMStateStore store, @@ -74,7 +76,8 @@ public RMContextImpl(Dispatcher rmDispatcher, AMRMTokenSecretManager amRMTokenSecretManager, RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager, - ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { + ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, + RMApplicationHistoryWriter rmApplicationHistoryWriter) { this.rmDispatcher = rmDispatcher; this.stateStore = store; this.containerAllocationExpirer = containerAllocationExpirer; @@ -85,6 +88,7 @@ public RMContextImpl(Dispatcher rmDispatcher, this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; this.clientToAMTokenSecretManager = clientToAMTokenSecretManager; + this.rmApplicationHistoryWriter = rmApplicationHistoryWriter; } @VisibleForTesting @@ -97,11 +101,12 @@ public RMContextImpl(Dispatcher rmDispatcher, AMRMTokenSecretManager appTokenSecretManager, RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager, - ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { - this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor, - amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager, - containerTokenSecretManager, nmTokenSecretManager, - clientToAMTokenSecretManager); + ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, + RMApplicationHistoryWriter rmApplicationHistoryWriter) { + this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor, + amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager, + containerTokenSecretManager, nmTokenSecretManager, + clientToAMTokenSecretManager, rmApplicationHistoryWriter); RMStateStore nullStore = new NullRMStateStore(); nullStore.setRMDispatcher(rmDispatcher); try { @@ -111,13 +116,13 @@ public RMContextImpl(Dispatcher rmDispatcher, assert false; } } - + @Override public Dispatcher getDispatcher() { return this.rmDispatcher; } - - @Override + + @Override public RMStateStore getStateStore() { return stateStore; } @@ -131,7 +136,7 @@ public RMStateStore getStateStore() { public ConcurrentMap getRMNodes() { return this.nodes; } - + @Override public ConcurrentMap getInactiveRMNodes() { return this.inactiveNodes; @@ -166,40 +171,52 @@ public AMRMTokenSecretManager getAMRMTokenSecretManager() { public RMContainerTokenSecretManager getContainerTokenSecretManager() { return this.containerTokenSecretManager; } - + @Override public NMTokenSecretManagerInRM getNMTokenSecretManager() { return this.nmTokenSecretManager; } - + @Override public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() { return this.clientToAMTokenSecretManager; } - + @VisibleForTesting public void setStateStore(RMStateStore store) { stateStore = store; } - + @Override public ClientRMService getClientRMService() { return this.clientRMService; } - + @Override public void setClientRMService(ClientRMService clientRMService) { this.clientRMService = clientRMService; } - + @Override public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() { return this.rmDelegationTokenSecretManager; } - + @Override public void setRMDelegationTokenSecretManager( RMDelegationTokenSecretManager delegationTokenSecretManager) { this.rmDelegationTokenSecretManager = delegationTokenSecretManager; } + + @Override + public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { + return rmApplicationHistoryWriter; + } + + @Override + public void setRMApplicationHistoryWriter( + RMApplicationHistoryWriter rmApplicationHistoryWriter) { + this.rmApplicationHistoryWriter = rmApplicationHistoryWriter; + } + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index f9ee097..5b5b34d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; @@ -266,6 +267,10 @@ protected RMAppManager createRMAppManager() { this.applicationACLsManager, this.conf); } + protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { + return new RMApplicationHistoryWriter(); + } + // sanity check for configurations protected static void validateConfigs(Configuration conf) { // validate max-attempts @@ -350,11 +355,15 @@ protected void serviceInit(Configuration configuration) throws Exception { delegationTokenRenewer = createDelegationTokenRenewer(); } + RMApplicationHistoryWriter rmApplicationHistoryWriter = + createRMApplicationHistoryWriter(); + addService(rmApplicationHistoryWriter); + rmContext = new RMContextImpl( rmDispatcher, rmStore, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, delegationTokenRenewer, amRmTokenSecretManager, containerTokenSecretManager, nmTokenSecretManager, - clientToAMSecretManager); + clientToAMSecretManager, rmApplicationHistoryWriter); // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java new file mode 100644 index 0000000..77f7e62 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.applicationhistoryservice.DummyApplicationHistoryStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +/** + *

+ * {@link ResourceManager} uses this class to write the information of + * {@link RMApp}, {@link RMAppAttempt} and {@link RMContainer}. These APIs are + * non-blocking, and just schedule a writing history event. An self-contained + * dispatcher vector will handle the event in separate threads, and extract the + * required fields that are going to be persisted. Then, the extracted + * information will be persisted via the implementation of + * {@link ApplicationHistoryStore}. + *

+ */ +@Private +@Unstable +public class RMApplicationHistoryWriter extends CompositeService { + + public static final Log LOG = + LogFactory.getLog(RMApplicationHistoryWriter.class); + + private Dispatcher dispatcher; + private ApplicationHistoryWriter writer; + + public RMApplicationHistoryWriter() { + super(RMApplicationHistoryWriter.class.getName()); + } + + @Override + protected synchronized void serviceInit( + Configuration conf) throws Exception { + writer = createApplicationHistoryStore(conf); + addIfService(writer); + + dispatcher = createDispatcher(conf); + dispatcher.register( + WritingHistoryEventType.class, new ForwardingEventHandler()); + addIfService(dispatcher); + super.serviceInit(conf); + } + + protected Dispatcher createDispatcher(Configuration conf) { + HomoDispatcherVector dispatcher = new HomoDispatcherVector(conf.getInt( + YarnConfiguration.RM_AHS_WRITER_COMPSITE_DISPATCHER_SIZE, + YarnConfiguration.DEFAULT_RM_AHS_WRITER_COMPSITE_DISPATCHER_SIZE)); + dispatcher.setDrainingStop(); + return dispatcher; + } + + protected ApplicationHistoryStore createApplicationHistoryStore( + Configuration conf) { + boolean ahsEnabled = conf.getBoolean( + YarnConfiguration.RM_AHS_WRITER_ENABLED, + YarnConfiguration.DEFAULT_RM_AHS_WRITER_ENABLED); + // If the history writer is not enabled, a dummy store will be used to + // write nothing + if (ahsEnabled) { + try { + Class storeClass = + conf.getClass(YarnConfiguration.AHS_STORE_CLASS, + DummyApplicationHistoryStore.class, + ApplicationHistoryStore.class); + return storeClass.newInstance(); + } catch (Exception e) { + String msg = "Could not instantiate ApplicationHistoryWriter: " + + conf.get(YarnConfiguration.AHS_STORE_CLASS, + DummyApplicationHistoryStore.class.getName()); + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); + } + } else { + return new DummyApplicationHistoryStore(); + } + } + + protected void handleWritingApplicationHistoryEvent( + WritingApplicationHistoryEvent event) { + switch (event.getType()) { + case WRITING_APP_START: + WritingApplicationStartEvent wasEvent = + (WritingApplicationStartEvent) event; + try { + writer.applicationStarted(wasEvent.getApplicationStartData()); + LOG.info("Stored the start data of application " + + wasEvent.getApplicationId()); + } catch (IOException e) { + LOG.error("Error when storing the start data of application " + + wasEvent.getApplicationId()); + } + break; + case WRITING_APP_FINISH: + WritingApplicationFinishEvent wafEvent = + (WritingApplicationFinishEvent) event; + try { + writer.applicationFinished(wafEvent.getApplicationFinishData()); + LOG.info("Stored the finish data of application " + + wafEvent.getApplicationId()); + } catch (IOException e) { + LOG.error("Error when storing the finish data of application " + + wafEvent.getApplicationId()); + } + break; + case WRITING_APP_ATTEMPT_START: + WritingApplicationAttemptStartEvent waasEvent = + (WritingApplicationAttemptStartEvent) event; + try { + writer.applicationAttemptStarted(waasEvent + .getApplicationAttemptStartData()); + LOG.info("Stored the start data of application attempt " + + waasEvent.getApplicationAttemptId()); + } catch (IOException e) { + LOG.error("Error when storing the start data of application attempt " + + waasEvent.getApplicationAttemptId()); + } + break; + case WRITING_APP_ATTEMPT_FINISH: + WritingApplicationAttemptFinishEvent waafEvent = + (WritingApplicationAttemptFinishEvent) event; + try { + writer.applicationAttemptFinished(waafEvent + .getApplicationAttemptFinishData()); + LOG.info("Stored the finish data of application attempt " + + waafEvent.getApplicationAttemptId()); + } catch (IOException e) { + LOG.error("Error when storing the finish data of application attempt " + + waafEvent.getApplicationAttemptId()); + } + break; + case WRITING_CONTAINER_START: + WritingContainerStartEvent wcsEvent = + (WritingContainerStartEvent) event; + try { + writer.containerStarted(wcsEvent.getContainerStartData()); + LOG.info("Stored the start data of container " + + wcsEvent.getContainerId()); + } catch (IOException e) { + LOG.error("Error when storing the start data of container " + + wcsEvent.getContainerId()); + } + break; + case WRITING_CONTAINER_FINISH: + WritingContainerFinishEvent wcfEvent = + (WritingContainerFinishEvent) event; + try { + writer.containerFinished(wcfEvent.getContainerFinishData()); + LOG.info("Stored the finish data of container " + + wcfEvent.getContainerId()); + } catch (IOException e) { + LOG.error("Error when storing the finish data of container " + + wcfEvent.getContainerId()); + } + break; + default: + LOG.error("Unknown WritingApplicationHistoryEvent type: " + + event.getType()); + } + } + + @SuppressWarnings("unchecked") + public void applicationStarted(RMApp app) { + dispatcher.getEventHandler().handle( + new WritingApplicationStartEvent(app.getApplicationId(), + ApplicationStartData.newInstance(app.getApplicationId(), + app.getName(), app.getApplicationType(), app.getQueue(), + app.getUser(), app.getSubmitTime(), app.getStartTime()))); + } + + @SuppressWarnings("unchecked") + public void applicationFinished(RMApp app) { + dispatcher.getEventHandler().handle( + new WritingApplicationFinishEvent(app.getApplicationId(), + ApplicationFinishData.newInstance(app.getApplicationId(), + app.getFinishTime(), + app.getDiagnostics().toString(), + app.getFinalApplicationStatus(), + app.createApplicationState()))); + } + + @SuppressWarnings("unchecked") + public void applicationAttemptStarted(RMAppAttempt appAttempt) { + dispatcher.getEventHandler().handle( + new WritingApplicationAttemptStartEvent(appAttempt.getAppAttemptId(), + ApplicationAttemptStartData.newInstance( + appAttempt.getAppAttemptId(), appAttempt.getHost(), + appAttempt.getRpcPort(), appAttempt.getMasterContainer() + .getId()))); + } + + @SuppressWarnings("unchecked") + public void applicationAttemptFinished(RMAppAttempt appAttempt) { + dispatcher.getEventHandler().handle( + new WritingApplicationAttemptFinishEvent(appAttempt.getAppAttemptId(), + ApplicationAttemptFinishData.newInstance(appAttempt + .getAppAttemptId(), appAttempt.getDiagnostics().toString(), + appAttempt.getTrackingUrl(), appAttempt + .getFinalApplicationStatus(), appAttempt + .createApplicationAttemptState()))); + } + + @SuppressWarnings("unchecked") + public void containerStarted(RMContainer container) { + // TODO: fill ContainerStartData with correct info + dispatcher.getEventHandler().handle( + new WritingContainerStartEvent(container.getContainerId(), + ContainerStartData.newInstance(container.getContainerId(), + container.getReservedResource(), container.getReservedNode(), + container.getReservedPriority(), System.currentTimeMillis()))); + } + + @SuppressWarnings("unchecked") + public void containerFinished(RMContainer container) { + // TODO: fill ContainerFinishData with correct info + dispatcher.getEventHandler().handle( + new WritingContainerFinishEvent(container.getContainerId(), + ContainerFinishData.newInstance(container.getContainerId(), + System.currentTimeMillis(), null, null, 0, + ContainerState.COMPLETE))); + } + + /** + * EventHandler implementation which forward events to HistoryWriter Making + * use of it, HistoryWriter can avoid to have a public handle method + */ + private final class ForwardingEventHandler + implements EventHandler { + + @Override + public void handle(WritingApplicationHistoryEvent event) { + handleWritingApplicationHistoryEvent(event); + } + + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + protected static class HomoDispatcherVector + extends CompositeService implements Dispatcher { + + private List dispatchers = new ArrayList(); + + public HomoDispatcherVector(int num) { + super(HomoDispatcherVector.class.getName()); + for (int i = 0; i < num; ++i) { + Dispatcher dispatcher = createDispatcher(); + dispatchers.add(dispatcher); + addIfService(dispatcher); + } + } + + @Override + public EventHandler getEventHandler() { + return new CompositEventHandler(); + } + + @Override + public void register(Class eventType, EventHandler handler) { + for (Dispatcher dispatcher : dispatchers) { + dispatcher.register(eventType, handler); + } + } + + public void setDrainingStop() { + // TODO: Need to call setDrainingStop of the dispatchers given YARN-1121 + // is finished + } + + private class CompositEventHandler implements EventHandler { + + @Override + public void handle(Event event) { + // Use hashCode (of ApplicationId) to dispatch the event to the child + // dispatcher, such that all the writing events of one application will + // be handled by one thread, the scheduled order of the these events + // will be preserved + int index = Math.abs(event.hashCode()) % dispatchers.size(); + dispatchers.get(index).getEventHandler().handle(event); + } + + } + + protected Dispatcher createDispatcher() { + return new AsyncDispatcher(); + } + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptFinishEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptFinishEvent.java new file mode 100644 index 0000000..10bfce2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptFinishEvent.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; + +public class WritingApplicationAttemptFinishEvent extends + WritingApplicationHistoryEvent { + + private ApplicationAttemptId appAttemptId; + private ApplicationAttemptFinishData appAttemptFinish; + + public WritingApplicationAttemptFinishEvent( + ApplicationAttemptId appAttemptId, + ApplicationAttemptFinishData appAttemptFinish) { + super(WritingHistoryEventType.WRITING_APP_ATTEMPT_FINISH); + this.appAttemptId = appAttemptId; + this.appAttemptFinish = appAttemptFinish; + } + + @Override + public int hashCode() { + return appAttemptId.getApplicationId().hashCode(); + } + + public ApplicationAttemptId getApplicationAttemptId() { + return appAttemptId; + } + + public ApplicationAttemptFinishData getApplicationAttemptFinishData() { + return appAttemptFinish; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptStartEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptStartEvent.java new file mode 100644 index 0000000..2b25ba2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationAttemptStartEvent.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; + +public class WritingApplicationAttemptStartEvent extends + WritingApplicationHistoryEvent { + + private ApplicationAttemptId appAttemptId; + private ApplicationAttemptStartData appAttemptStart; + + public WritingApplicationAttemptStartEvent(ApplicationAttemptId appAttemptId, + ApplicationAttemptStartData appAttemptStart) { + super(WritingHistoryEventType.WRITING_APP_ATTEMPT_START); + this.appAttemptId = appAttemptId; + this.appAttemptStart = appAttemptStart; + } + + @Override + public int hashCode() { + return appAttemptId.getApplicationId().hashCode(); + } + + public ApplicationAttemptId getApplicationAttemptId() { + return appAttemptId; + } + + public ApplicationAttemptStartData getApplicationAttemptStartData() { + return appAttemptStart; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationFinishEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationFinishEvent.java new file mode 100644 index 0000000..1429d2c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationFinishEvent.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; + +public class WritingApplicationFinishEvent extends + WritingApplicationHistoryEvent { + + private ApplicationId appId; + private ApplicationFinishData appFinish; + + public WritingApplicationFinishEvent(ApplicationId appId, + ApplicationFinishData appFinish) { + super(WritingHistoryEventType.WRITING_APP_FINISH); + this.appId = appId; + this.appFinish = appFinish; + } + + @Override + public int hashCode() { + return appId.hashCode(); + } + + public ApplicationId getApplicationId() { + return appId; + } + + public ApplicationFinishData getApplicationFinishData() { + return appFinish; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationHistoryEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationHistoryEvent.java new file mode 100644 index 0000000..bc17edc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationHistoryEvent.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class WritingApplicationHistoryEvent extends + AbstractEvent { + + public WritingApplicationHistoryEvent(WritingHistoryEventType type) { + super(type); + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationStartEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationStartEvent.java new file mode 100644 index 0000000..f8790e7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingApplicationStartEvent.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; + +public class WritingApplicationStartEvent extends + WritingApplicationHistoryEvent { + + private ApplicationId appId; + private ApplicationStartData appStart; + + public WritingApplicationStartEvent(ApplicationId appId, + ApplicationStartData appStart) { + super(WritingHistoryEventType.WRITING_APP_START); + this.appId = appId; + this.appStart = appStart; + } + + @Override + public int hashCode() { + return appId.hashCode(); + } + + public ApplicationId getApplicationId() { + return appId; + } + + public ApplicationStartData getApplicationStartData() { + return appStart; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerFinishEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerFinishEvent.java new file mode 100644 index 0000000..ec455fd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerFinishEvent.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; + +public class WritingContainerFinishEvent extends WritingApplicationHistoryEvent { + + private ContainerId containerId; + private ContainerFinishData containerFinish; + + public WritingContainerFinishEvent(ContainerId containerId, + ContainerFinishData containerFinish) { + super(WritingHistoryEventType.WRITING_CONTAINER_FINISH); + this.containerId = containerId; + this.containerFinish = containerFinish; + } + + @Override + public int hashCode() { + return containerId.getApplicationAttemptId().getApplicationId().hashCode(); + } + + public ContainerId getContainerId() { + return containerId; + } + + public ContainerFinishData getContainerFinishData() { + return containerFinish; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerStartEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerStartEvent.java new file mode 100644 index 0000000..90402f0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingContainerStartEvent.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; + +public class WritingContainerStartEvent extends WritingApplicationHistoryEvent { + + private ContainerId containerId; + private ContainerStartData containerStart; + + public WritingContainerStartEvent(ContainerId containerId, + ContainerStartData containerStart) { + super(WritingHistoryEventType.WRITING_CONTAINER_START); + this.containerId = containerId; + this.containerStart = containerStart; + } + + @Override + public int hashCode() { + return containerId.getApplicationAttemptId().getApplicationId().hashCode(); + } + + public ContainerId getContainerId() { + return containerId; + } + + public ContainerStartData getContainerStartData() { + return containerStart; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEventType.java new file mode 100644 index 0000000..45ffa4a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/WritingHistoryEventType.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + + +public enum WritingHistoryEventType { + WRITING_APP_START, + WRITING_APP_FINISH, + WRITING_APP_ATTEMPT_START, + WRITING_APP_ATTEMPT_FINISH, + WRITING_CONTAINER_START, + WRITING_CONTAINER_FINISH +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index a8a0af4..df4a13d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -54,10 +54,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -282,6 +280,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.writeLock = lock.writeLock(); this.stateMachine = stateMachineFactory.make(this); + + rmContext.getRMApplicationHistoryWriter().applicationStarted(this); } @Override @@ -712,6 +712,10 @@ public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) { app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(), RMAppAttemptEventType.KILL)); + // In this case, RMApp will enter the final state before RMAppAttempt + // Therefore, notify not to write the finish data immediately and let + // RMAppAttempt to write the finish data first + super.writeAppFinished = false; super.transition(app, event); } } @@ -726,6 +730,8 @@ public void transition(RMAppImpl app, RMAppEvent event) { private static class FinalTransition extends RMAppTransition { + protected boolean writeAppFinished = true; + private Set getNodesOnWhichAttemptRan(RMAppImpl app) { Set nodes = new HashSet(); for (RMAppAttempt attempt : app.attempts.values()) { @@ -747,6 +753,11 @@ public void transition(RMAppImpl app, RMAppEvent event) { // application completely done and remove from state store. app.removeApplicationState(); + if (writeAppFinished) { + app.rmContext.getRMApplicationHistoryWriter() + .applicationFinished(app); + } + app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 335dbda..3a666dd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -178,4 +179,21 @@ * @return the start time of the application. */ long getStartTime(); + + /** + * The current state of the {@link RMAppAttempt}. + * + * @return the current state {@link RMAppAttemptState} for this application + * attempt. + */ + RMAppAttemptState getState(); + + /** + * Create the external user-facing state of the attempt of ApplicationMaster + * from the current state of the {@link RMAppAttempt}. + * + * @return the external user-facing state of the attempt ApplicationMaster. + */ + YarnApplicationAttemptState createApplicationAttemptState(); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index f68a4a5..8d8275c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -40,7 +40,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -57,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; @@ -773,6 +775,9 @@ public void transition(RMAppAttemptImpl appAttempt, ); appAttempt.removeCredentials(appAttempt); + + appAttempt.rmContext.getRMApplicationHistoryWriter() + .applicationAttemptFinished(appAttempt); } } @@ -910,6 +915,17 @@ public void transition(RMAppAttemptImpl appAttempt, finalAttemptState)); appAttempt.removeCredentials(appAttempt); + + appAttempt.rmContext.getRMApplicationHistoryWriter() + .applicationAttemptFinished(appAttempt); + // In this case, we need to record the finish data or RMApp as well, + // because it will not write the data there + if (finalAttemptState == RMAppAttemptState.KILLED) { + RMApp app = appAttempt.rmContext.getRMApps().get( + appAttemptId.getApplicationId()); + appAttempt.rmContext.getRMApplicationHistoryWriter() + .applicationFinished(app); + } } } @@ -1007,6 +1023,9 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.eventHandler.handle(new RMAppEvent(appAttempt .getAppAttemptId().getApplicationId(), RMAppEventType.ATTEMPT_REGISTERED)); + + appAttempt.rmContext.getRMApplicationHistoryWriter() + .applicationAttemptStarted(appAttempt); } } @@ -1245,6 +1264,23 @@ public long getStartTime() { } } + @Override + public RMAppAttemptState getState() { + this.readLock.lock(); + + try { + return this.stateMachine.getCurrentState(); + } finally { + this.readLock.unlock(); + } + } + + @Override + public YarnApplicationAttemptState createApplicationAttemptState() { + RMAppAttemptState state = getState(); + return RMServerUtils.createApplicationAttemptState(state); + } + private void launchAttempt(){ // Send event to launch the AM Container eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index d44fd3f..dd22618 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; @@ -133,6 +134,7 @@ private final ApplicationAttemptId appAttemptId; private final NodeId nodeId; private final Container container; + private final RMContext rmContext; private final EventHandler eventHandler; private final ContainerAllocationExpirer containerAllocationExpirer; @@ -140,17 +142,18 @@ private NodeId reservedNode; private Priority reservedPriority; + public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, - EventHandler handler, - ContainerAllocationExpirer containerAllocationExpirer) { + RMContext rmContext) { this.stateMachine = stateMachineFactory.make(this); this.containerId = container.getId(); this.nodeId = nodeId; this.container = container; this.appAttemptId = appAttemptId; - this.eventHandler = handler; - this.containerAllocationExpirer = containerAllocationExpirer; + this.rmContext = rmContext; + this.eventHandler = rmContext.getDispatcher().getEventHandler(); + this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); @@ -256,6 +259,9 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { public void transition(RMContainerImpl container, RMContainerEvent event) { container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent( container.appAttemptId, container.container)); + + container.rmContext.getRMApplicationHistoryWriter() + .containerStarted(container); } } @@ -291,6 +297,9 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { // Inform AppAttempt container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( container.appAttemptId, finishedEvent.getRemoteContainerStatus())); + + container.rmContext.getRMApplicationHistoryWriter() + .containerFinished(container); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index ade40c1..84c4206 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -231,8 +231,7 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority, if (rmContainer == null) { rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), - node.getNodeID(), rmContext.getDispatcher().getEventHandler(), - rmContext.getContainerAllocationExpirer()); + node.getNodeID(), rmContext); Resources.addTo(currentReservation, container.getResource()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 7f51126..212150f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -121,9 +121,7 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, // Create RMContainer RMContainer rmContainer = new RMContainerImpl(container, this - .getApplicationAttemptId(), node.getNodeID(), this.rmContext - .getDispatcher().getEventHandler(), this.rmContext - .getContainerAllocationExpirer()); + .getApplicationAttemptId(), node.getNodeID(), this.rmContext); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index caf2a97..7a23e76 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -272,9 +272,7 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && // Create RMContainer RMContainer rmContainer = new RMContainerImpl(container, - getApplicationAttemptId(), node.getNodeID(), rmContext - .getDispatcher().getEventHandler(), rmContext - .getContainerAllocationExpirer()); + getApplicationAttemptId(), node.getNodeID(), rmContext); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 6698412..414dca0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -99,9 +100,10 @@ public static RMContext mockRMContext(int n, long time) { rmDispatcher); AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor( rmDispatcher); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); return new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, - null, null, null, null, null) { + null, null, null, null, null, writer) { @Override public ConcurrentMap getRMApps() { return map; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 9ec82c4..f8f763a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -18,19 +18,19 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; -import java.util.HashSet; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.BrokenBarrierException; @@ -47,10 +47,9 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; @@ -78,6 +77,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -510,6 +510,8 @@ private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext) .thenReturn(queInfo); when(yarnScheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), anyBoolean())) .thenThrow(new IOException("queue does not exist")); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); ConcurrentHashMap apps = getRMApps(rmContext, yarnScheduler); when(rmContext.getRMApps()).thenReturn(apps); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index c060bb6..2ecb994 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -85,7 +85,7 @@ public void setUp() throws Exception { rmContext = new RMContextImpl(rmDispatcher, null, null, null, - mock(DelegationTokenRenewer.class), null, null, null, null); + mock(DelegationTokenRenewer.class), null, null, null, null, null); scheduler = mock(YarnScheduler.class); doAnswer( new Answer() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java new file mode 100644 index 0000000..7bca0f9 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.ahs; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestRMApplicationHistoryWriter { + + private static int MAX_RETRIES = 10; + + private RMApplicationHistoryWriter writer; + private ApplicationHistoryStore store; + private List dispatchers = + new ArrayList(); + + @Before + public void setup() { + store = new MemoryApplicationHistoryStore(); + Configuration conf = new Configuration(); + writer = new RMApplicationHistoryWriter() { + + @Override + protected ApplicationHistoryStore createApplicationHistoryStore( + Configuration conf) { + return store; + } + + @Override + protected Dispatcher createDispatcher(Configuration conf) { + HomoDispatcherVector dispatcher = new HomoDispatcherVector(conf.getInt( + YarnConfiguration.RM_AHS_WRITER_COMPSITE_DISPATCHER_SIZE, + YarnConfiguration.DEFAULT_RM_AHS_WRITER_COMPSITE_DISPATCHER_SIZE)); + dispatcher.setDrainingStop(); + return dispatcher; + } + + class HomoDispatcherVector extends + RMApplicationHistoryWriter.HomoDispatcherVector { + + public HomoDispatcherVector(int num) { + super(num); + } + + @Override + protected Dispatcher createDispatcher() { + CounterDispatcher dispatcher = new CounterDispatcher(); + dispatchers.add(dispatcher); + return dispatcher; + } + + } + }; + writer.init(conf); + writer.start(); + } + + @After + public void tearDown() { + writer.stop(); + } + + private static RMApp createRMApp(ApplicationId appId) { + RMApp app = mock(RMApp.class); + when(app.getApplicationId()).thenReturn(appId); + when(app.getName()).thenReturn("test app"); + when(app.getApplicationType()).thenReturn("test app type"); + when(app.getUser()).thenReturn("test user"); + when(app.getQueue()).thenReturn("test queue"); + when(app.getSubmitTime()).thenReturn(0L); + when(app.getStartTime()).thenReturn(1L); + when(app.getFinishTime()).thenReturn(2L); + when(app.getDiagnostics()).thenReturn( + new StringBuilder("test diagnostics info")); + when(app.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.UNDEFINED); + when(app.createApplicationState()) + .thenReturn(YarnApplicationState.FINISHED); + return app; + } + + private static RMAppAttempt createRMAppAttempt( + ApplicationAttemptId appAttemptId) { + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId); + when(appAttempt.getHost()).thenReturn("test host"); + when(appAttempt.getRpcPort()).thenReturn(-100); + Container container = mock(Container.class); + when(container.getId()).thenReturn( + ContainerId.newInstance(appAttemptId, 1)); + when(appAttempt.getMasterContainer()).thenReturn(container); + when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info"); + when(appAttempt.getTrackingUrl()).thenReturn("test url"); + when(appAttempt.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.UNDEFINED); + when(appAttempt.createApplicationAttemptState()).thenReturn( + YarnApplicationAttemptState.FINISHED); + return appAttempt; + } + + private static RMContainer createRMContainer( + ContainerId containerId) { + RMContainer container = mock(RMContainer.class); + when(container.getContainerId()).thenReturn(containerId); + when(container.getReservedNode()).thenReturn( + NodeId.newInstance("test host", -100)); + when(container.getReservedResource()).thenReturn( + Resource.newInstance(-1, -1)); + when(container.getReservedPriority()).thenReturn(Priority.UNDEFINED); + return container; + } + + @Test + public void testWriteApplication() throws Exception { + RMApp app = createRMApp(ApplicationId.newInstance(0, 1)); + + writer.applicationStarted(app); + ApplicationHistoryData appHD = null; + for (int i = 0; i < MAX_RETRIES; ++i) { + appHD = store.getApplication(ApplicationId.newInstance(0, 1)); + if (appHD != null) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertNotNull(appHD); + Assert.assertEquals("test app", appHD.getApplicationName()); + Assert.assertEquals("test app type", appHD.getApplicationType()); + Assert.assertEquals("test user", appHD.getUser()); + Assert.assertEquals("test queue", appHD.getQueue()); + Assert.assertEquals(0L, appHD.getSubmitTime()); + Assert.assertEquals(1L, appHD.getStartTime()); + + writer.applicationFinished(app); + for (int i = 0; i < MAX_RETRIES; ++i) { + appHD = store.getApplication(ApplicationId.newInstance(0, 1)); + if (appHD.getYarnApplicationState() != null) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertEquals(2L, appHD.getFinishTime()); + Assert.assertEquals("test diagnostics info", appHD.getDiagnosticsInfo()); + Assert.assertEquals(FinalApplicationStatus.UNDEFINED, + appHD.getFinalApplicationStatus()); + Assert.assertEquals(YarnApplicationState.FINISHED, + appHD.getYarnApplicationState()); + } + + @Test + public void testWriteApplicationAttempt() throws Exception { + RMAppAttempt appAttempt = createRMAppAttempt( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1)); + writer.applicationAttemptStarted(appAttempt); + ApplicationAttemptHistoryData appAttemptHD = null; + for (int i = 0; i < MAX_RETRIES; ++i) { + appAttemptHD = + store.getApplicationAttempt(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1)); + if (appAttemptHD != null) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertNotNull(appAttemptHD); + Assert.assertEquals("test host", appAttemptHD.getHost()); + Assert.assertEquals(-100, appAttemptHD.getRPCPort()); + Assert.assertEquals( + ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1), + appAttemptHD.getMasterContainerId()); + + writer.applicationAttemptFinished(appAttempt); + for (int i = 0; i < MAX_RETRIES; ++i) { + appAttemptHD = + store.getApplicationAttempt(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1)); + if (appAttemptHD.getYarnApplicationAttemptState() != null) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertEquals("test diagnostics info", + appAttemptHD.getDiagnosticsInfo()); + Assert.assertEquals("test url", appAttemptHD.getTrackingURL()); + Assert.assertEquals(FinalApplicationStatus.UNDEFINED, + appAttemptHD.getFinalApplicationStatus()); + Assert.assertEquals(YarnApplicationAttemptState.FINISHED, + appAttemptHD.getYarnApplicationAttemptState()); + } + + @Test + public void testWriteContainer() throws Exception { + // TODO: the test case need to be improved in YARN-974 + RMContainer container = createRMContainer( + ContainerId.newInstance(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1)); + writer.containerStarted(container); + ContainerHistoryData containerHD = null; + for (int i = 0; i < MAX_RETRIES; ++i) { + containerHD = + store.getContainer(ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1)); + if (containerHD != null) { + break; + } else { + Thread.sleep(100); + } + } + Assert.assertNotNull(containerHD); + Assert.assertEquals(NodeId.newInstance("test host", -100), + containerHD.getAssignedNode()); + Assert.assertEquals(Resource.newInstance(-1, -1), + containerHD.getAllocatedResource()); + Assert.assertEquals(Priority.UNDEFINED, containerHD.getPriority()); + + writer.containerFinished(container); + for (int i = 0; i < MAX_RETRIES; ++i) { + containerHD = + store.getContainer(ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1)); + if (containerHD.getContainerState() != null) { + break; + } else { + Thread.sleep(100); + } + Assert.assertEquals(ContainerState.COMPLETE, + containerHD.getContainerState()); + } + } + + @Test + public void testParallelWrite() throws Exception { + List appIds = new ArrayList(); + for (int i = 0; i < 10; ++i) { + Random rand = new Random(i); + ApplicationId appId = ApplicationId.newInstance(0, rand.nextInt()); + appIds.add(appId); + RMApp app = createRMApp(appId); + writer.applicationStarted(app); + for (int j = 1; j <= 10; ++j) { + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, j); + RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId); + writer.applicationAttemptStarted(appAttempt); + for (int k = 1; k <= 10; ++k) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, k); + RMContainer container = createRMContainer(containerId); + writer.containerStarted(container); + writer.containerFinished(container); + } + writer.applicationAttemptFinished(appAttempt); + } + writer.applicationFinished(app); + } + for (int i = 0; i < MAX_RETRIES; ++i) { + if (allEventsHandled(20 * 10 * 10 + 20 * 10 + 20)) { + break; + } else { + Thread.sleep(500); + } + } + Assert.assertTrue(allEventsHandled(20 * 10 * 10 + 20 * 10 + 20)); + // Validate all events of one application are handled by one dispatcher + for (ApplicationId appId : appIds) { + Assert.assertTrue(handledByOne(appId)); + } + } + + private boolean allEventsHandled(int expected) { + int actual = 0; + for (CounterDispatcher dispatcher : dispatchers) { + for (Integer count : dispatcher.counts.values()) { + actual += count; + } + } + return actual == expected; + } + + private boolean handledByOne(ApplicationId appId) { + int count = 0; + for (CounterDispatcher dispatcher : dispatchers) { + if (dispatcher.counts.containsKey(appId)) { + ++count; + } + } + return count == 1; + } + + private static class CounterDispatcher extends AsyncDispatcher { + + private Map counts = + new HashMap(); + + @SuppressWarnings("rawtypes") + @Override + protected void dispatch(Event event) { + if (event instanceof WritingApplicationHistoryEvent) { + WritingApplicationHistoryEvent ashEvent = + (WritingApplicationHistoryEvent) event; + switch (ashEvent.getType()) { + case WRITING_APP_START: + increment(((WritingApplicationStartEvent) event).getApplicationId()); + break; + case WRITING_APP_FINISH: + increment(((WritingApplicationFinishEvent) event) + .getApplicationId()); + break; + case WRITING_APP_ATTEMPT_START: + increment(((WritingApplicationAttemptStartEvent) event) + .getApplicationAttemptId().getApplicationId()); + break; + case WRITING_APP_ATTEMPT_FINISH: + increment(((WritingApplicationAttemptFinishEvent) event) + .getApplicationAttemptId().getApplicationId()); + break; + case WRITING_CONTAINER_START: + increment(((WritingContainerStartEvent) event).getContainerId() + .getApplicationAttemptId().getApplicationId()); + break; + case WRITING_CONTAINER_FINISH: + increment(((WritingContainerFinishEvent) event).getContainerId() + .getApplicationAttemptId().getApplicationId()); + break; + } + } + super.dispatch(event); + } + + private void increment(ApplicationId appId) { + Integer val = counts.get(appId); + if (val == null) { + counts.put(appId, 1); + } else { + counts.put(appId, val + 1); + } + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java index a884552..756bf45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java @@ -72,7 +72,7 @@ public void setUp() { // Dispatcher that processes events inline Dispatcher dispatcher = new InlineDispatcher(); RMContext context = new RMContextImpl(dispatcher, null, - null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null); dispatcher.register(SchedulerEventType.class, new InlineDispatcher.EmptyEventHandler()); dispatcher.register(RMNodeEventType.class, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java index ddb7a90..4f94695 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java @@ -71,7 +71,7 @@ public void handle(Event event) { RMContext context = new RMContextImpl(dispatcher, null, null, null, null, null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), null); + new NMTokenSecretManagerInRM(conf), null, null); dispatcher.register(RMNodeEventType.class, new ResourceManager.NodeEventDispatcher(context)); NodesListManager nodesListManager = new NodesListManager(context); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2075921..ad56708 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -22,6 +22,7 @@ import static org.junit.Assume.assumeTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -92,6 +94,7 @@ private static int appId = 1; private DrainDispatcher rmDispatcher; private RMStateStore store; + private RMApplicationHistoryWriter writer; private YarnScheduler scheduler; // ignore all the RM application attempt events @@ -187,13 +190,15 @@ public void setUp() throws Exception { AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); store = mock(RMStateStore.class); + writer = mock(RMApplicationHistoryWriter.class); this.rmContext = new RMContextImpl(rmDispatcher, store, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, null, new AMRMTokenSecretManager(conf), new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM()); + new ClientToAMTokenSecretManagerInRM(), + writer); rmDispatcher.register(RMAppAttemptEventType.class, new TestApplicationAttemptEventDispatcher(this.rmContext)); @@ -326,6 +331,7 @@ private static void assertFailed(RMApp application, String regex) { protected RMApp testCreateAppNewSaving( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = createNewTestApp(submissionContext); + verify(writer).applicationStarted(any(RMApp.class)); // NEW => NEW_SAVING event RMAppEventType.START RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.START); @@ -447,6 +453,9 @@ public void testUnmanagedApp() throws IOException { Assert.assertTrue("Finished app missing diagnostics", application.getDiagnostics().indexOf(diagMsg) != -1); + // reset the counter of Mockito.verify + reset(writer); + // test app fails after 1 app attempt failure LOG.info("--- START: testUnmanagedAppFailPath ---"); application = testCreateAppRunning(subContext); @@ -486,6 +495,7 @@ public void testAppNewKill() throws IOException { application.handle(event); rmDispatcher.await(); assertKilled(application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -500,6 +510,7 @@ public void testAppNewReject() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, rejectedText); + verify(writer).applicationFinished(any(RMApp.class)); } @Test (timeout = 30000) @@ -513,6 +524,7 @@ public void testAppNewSavingKill() throws IOException { application.handle(event); rmDispatcher.await(); assertKilled(application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test (timeout = 30000) @@ -527,6 +539,7 @@ public void testAppNewSavingReject() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, rejectedText); + verify(writer).applicationFinished(any(RMApp.class)); } @Test (timeout = 30000) @@ -541,6 +554,7 @@ public void testAppSubmittedRejected() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, rejectedText); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -554,6 +568,7 @@ public void testAppSubmittedKill() throws IOException, InterruptedException { rmDispatcher.await(); assertKilled(application); assertAppAndAttemptKilled(application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -586,6 +601,7 @@ public void testAppAcceptedFailed() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, ".*" + message + ".*Failing the application.*"); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -599,6 +615,7 @@ public void testAppAcceptedKill() throws IOException, InterruptedException { rmDispatcher.await(); assertKilled(application); assertAppAndAttemptKilled(application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -612,6 +629,7 @@ public void testAppRunningKill() throws IOException { application.handle(event); rmDispatcher.await(); assertKilled(application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -663,6 +681,7 @@ public void testAppRunningFailed() throws IOException { application.handle(event); rmDispatcher.await(); assertFailed(application, ".*Failing the application.*"); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -675,6 +694,7 @@ public void testAppRemovingFinished() throws IOException { application.handle(finishedEvent); rmDispatcher.await(); assertAppState(RMAppState.FINISHED, application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -687,6 +707,7 @@ public void testAppRemovingKilled() throws IOException { application.handle(event); rmDispatcher.await(); assertAppState(RMAppState.KILLED, application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -700,6 +721,7 @@ public void testAppFinishingKill() throws IOException { application.handle(event); rmDispatcher.await(); assertAppState(RMAppState.FINISHED, application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -717,6 +739,7 @@ public void testAppFinishedFinished() throws IOException { StringBuilder diag = application.getDiagnostics(); Assert.assertEquals("application diagnostics is not correct", "", diag.toString()); + verify(writer).applicationFinished(any(RMApp.class)); } @Test (timeout = 30000) @@ -747,6 +770,7 @@ public void testAppFailedFailed() throws IOException { rmDispatcher.await(); assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test (timeout = 30000) @@ -802,6 +826,7 @@ public void testAppKilledKilled() throws IOException { rmDispatcher.await(); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); + verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -825,6 +850,9 @@ public void testClientTokens() throws Exception { report = app.createAndGetApplicationReport("clientuser", true); Assert.assertNull(report.getClientToAMToken()); + // reset the counter of Mockito.verify + reset(writer); + app = testCreateAppRunning(null); rmDispatcher.await(); assertAppState(RMAppState.RUNNING, app); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 1f3c506..00ea49c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; @@ -114,6 +115,7 @@ private ApplicationMasterLauncher applicationMasterLauncher; private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amFinishingMonitor; + private RMApplicationHistoryWriter writer; private RMApp application; private RMAppAttempt applicationAttempt; @@ -201,13 +203,15 @@ public void setUp() throws Exception { mock(ContainerAllocationExpirer.class); amLivelinessMonitor = mock(AMLivelinessMonitor.class); amFinishingMonitor = mock(AMLivelinessMonitor.class); + writer = mock(RMApplicationHistoryWriter.class); rmContext = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, null, amRMTokenManager, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - clientToAMTokenManager); + clientToAMTokenManager, + writer); RMStateStore store = mock(RMStateStore.class); ((RMContextImpl) rmContext).setStateStore(store); @@ -363,6 +367,7 @@ private void testAppAttemptKilledState(Container amContainer, assertEquals(0, applicationAttempt.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); + verify(writer).applicationAttemptFinished(any(RMAppAttempt.class)); } /** @@ -439,6 +444,7 @@ private void testAppAttemptFailedState(Container container, verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class)); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); + verify(writer).applicationAttemptFinished(any(RMAppAttempt.class)); } /** @@ -473,6 +479,7 @@ private void testAppAttemptRunningState(Container container, assertEquals(getProxyUrl(applicationAttempt), applicationAttempt.getTrackingUrl()); } + verify(writer).applicationAttemptStarted(any(RMAppAttempt.class)); // TODO - need to add more checks relevant to this state } @@ -518,6 +525,7 @@ private void testAppAttemptFinishedState(Container container, assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); + verify(writer).applicationAttemptFinished(any(RMAppAttempt.class)); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index d1262d8..e2646c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -19,9 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -34,6 +37,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; @@ -50,7 +55,6 @@ public void testReleaseWhileRunning() { DrainDispatcher drainDispatcher = new DrainDispatcher(); - EventHandler eventHandler = drainDispatcher.getEventHandler(); EventHandler appAttemptEventHandler = mock(EventHandler.class); EventHandler generic = mock(EventHandler.class); drainDispatcher.register(RMAppAttemptEventType.class, @@ -71,8 +75,13 @@ public void testReleaseWhileRunning() { Container container = BuilderUtils.newContainer(containerId, nodeId, "host:3465", resource, priority, null); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, - nodeId, eventHandler, expirer); + nodeId, rmContext); assertEquals(RMContainerState.NEW, rmContainer.getState()); @@ -80,6 +89,7 @@ public void testReleaseWhileRunning() { RMContainerEventType.START)); drainDispatcher.await(); assertEquals(RMContainerState.ALLOCATED, rmContainer.getState()); + verify(writer).containerStarted(any(RMContainer.class)); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED)); @@ -100,6 +110,7 @@ public void testReleaseWhileRunning() { containerStatus, RMContainerEventType.RELEASED)); drainDispatcher.await(); assertEquals(RMContainerState.RELEASED, rmContainer.getState()); + verify(writer).containerFinished(any(RMContainer.class)); ArgumentCaptor captor = ArgumentCaptor .forClass(RMAppAttemptContainerFinishedEvent.class); @@ -120,7 +131,6 @@ public void testReleaseWhileRunning() { public void testExpireWhileRunning() { DrainDispatcher drainDispatcher = new DrainDispatcher(); - EventHandler eventHandler = drainDispatcher.getEventHandler(); EventHandler appAttemptEventHandler = mock(EventHandler.class); EventHandler generic = mock(EventHandler.class); drainDispatcher.register(RMAppAttemptEventType.class, @@ -141,8 +151,13 @@ public void testExpireWhileRunning() { Container container = BuilderUtils.newContainer(containerId, nodeId, "host:3465", resource, priority, null); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, - nodeId, eventHandler, expirer); + nodeId, rmContext); assertEquals(RMContainerState.NEW, rmContainer.getState()); @@ -150,6 +165,7 @@ public void testExpireWhileRunning() { RMContainerEventType.START)); drainDispatcher.await(); assertEquals(RMContainerState.ALLOCATED, rmContainer.getState()); + verify(writer).containerStarted(any(RMContainer.class)); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED)); @@ -170,5 +186,6 @@ public void testExpireWhileRunning() { containerStatus, RMContainerEventType.EXPIRE)); drainDispatcher.await(); assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + verify(writer, never()).containerFinished(any(RMContainer.class)); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 627fae8..4fd083b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -345,7 +345,7 @@ public void testRefreshQueues() throws Exception { cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + new ClientToAMTokenSecretManagerInRM(), null)); checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); conf.setCapacity(A, 80f); @@ -444,7 +444,7 @@ public void testParseQueue() throws IOException { cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + new ClientToAMTokenSecretManagerInRM(), null)); } @Test @@ -457,7 +457,7 @@ public void testReconnectedNode() throws Exception { cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(csConf), new NMTokenSecretManagerInRM(csConf), - new ClientToAMTokenSecretManagerInRM())); + new ClientToAMTokenSecretManagerInRM(), null)); RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1); RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2); @@ -484,7 +484,7 @@ public void testRefreshQueuesWithNewQueue() throws Exception { cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + new ClientToAMTokenSecretManagerInRM(), null)); checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); // Add a new queue b4 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 3c55b42..588e320 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -248,14 +247,16 @@ public void testSortedQueues() throws Exception { ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); DrainDispatcher drainDispatcher = new DrainDispatcher(); - EventHandler eventHandler = drainDispatcher.getEventHandler(); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( app_0.getApplicationId(), 1); ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); Container container=TestUtils.getMockContainer(containerId, node_0.getNodeID(), Resources.createResource(1*GB), priority); RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, - node_0.getNodeID(), eventHandler, expirer); + node_0.getNodeID(), rmContext); // Assign {1,2,3,4} 1GB containers respectively to queues stubQueueAllocation(a, clusterResource, node_0, 1*GB); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index c86d6b3..21c446a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -47,7 +47,7 @@ public void testQueueParsing() throws Exception { capacityScheduler.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + new ClientToAMTokenSecretManagerInRM(), null)); CSQueue a = capacityScheduler.getQueue("a"); Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index b974528..db28dca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -84,12 +85,13 @@ public EventHandler getEventHandler() { new ContainerAllocationExpirer(nullDispatcher); Configuration conf = new Configuration(); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext rmContext = new RMContextImpl(nullDispatcher, cae, null, null, null, new AMRMTokenSecretManager(conf), new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM()); + new ClientToAMTokenSecretManagerInRM(), writer); return rmContext; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index ee302b9..04b4d4c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; +import static org.mockito.Mockito.mock; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -129,8 +132,9 @@ public void testFifoSchedulerCapacityWhenNoNMs() { @Test(timeout=5000) public void testAppAttemptMetrics() throws Exception { AsyncDispatcher dispatcher = new InlineDispatcher(); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext rmContext = new RMContextImpl(dispatcher, null, - null, null, null, null, null, null, null); + null, null, null, null, null, null, null, writer); FifoScheduler schedular = new FifoScheduler(); schedular.reinitialize(new Configuration(), rmContext); @@ -162,8 +166,9 @@ public void testNodeLocalAssignment() throws Exception { NMTokenSecretManagerInRM nmTokenSecretManager = new NMTokenSecretManagerInRM(conf); nmTokenSecretManager.rollMasterKey(); + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, - null, containerTokenSecretManager, nmTokenSecretManager, null); + null, containerTokenSecretManager, nmTokenSecretManager, null, writer); FifoScheduler scheduler = new FifoScheduler(); scheduler.reinitialize(new Configuration(), rmContext); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java index aa2d6c6..97cc495 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java @@ -161,7 +161,7 @@ public static RMContext mockRMContext(int numApps, int racks, int numNodes, deactivatedNodesMap.put(node.getHostName(), node); } return new RMContextImpl(null, null, null, null, - null, null, null, null, null) { + null, null, null, null, null, null) { @Override public ConcurrentMap getRMApps() { return applicationsMaps; @@ -204,7 +204,7 @@ public static CapacityScheduler mockCapacityScheduler() throws IOException { cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + new ClientToAMTokenSecretManagerInRM(), null)); return cs; }