diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index f1ebbfe..0b8b1c2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.api.records;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -26,6 +28,8 @@
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -90,7 +94,7 @@ public static ApplicationSubmissionContext newInstance(
context.setKeepContainersAcrossApplicationAttempts(keepContainers);
context.setNodeLabelExpression(appLabelExpression);
context.setResource(resource);
-
+
ResourceRequest amReq = Records.newRecord(ResourceRequest.class);
amReq.setResourceName(ResourceRequest.ANY);
amReq.setCapability(resource);
@@ -100,7 +104,7 @@ public static ApplicationSubmissionContext newInstance(
context.setAMContainerResourceRequest(amReq);
return context;
}
-
+
public static ApplicationSubmissionContext newInstance(
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
@@ -120,8 +124,8 @@ public static ApplicationSubmissionContext newInstance(
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType) {
return newInstance(applicationId, applicationName, queue, priority,
- amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
- resource, applicationType, false, null, null);
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+ resource, applicationType, false, null, null);
}
@Public
@@ -132,10 +136,10 @@ public static ApplicationSubmissionContext newInstance(
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource) {
return newInstance(applicationId, applicationName, queue, priority,
- amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
- resource, null);
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+ resource, null);
}
-
+
@Public
@Stable
public static ApplicationSubmissionContext newInstance(
@@ -169,8 +173,9 @@ public static ApplicationSubmissionContext newInstance(
boolean keepContainers, long attemptFailuresValidityInterval) {
ApplicationSubmissionContext context =
newInstance(applicationId, applicationName, queue, priority,
- amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
- resource, applicationType, keepContainers);
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete,
+ maxAppAttempts,
+ resource, applicationType, keepContainers);
context.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
return context;
}
@@ -185,11 +190,26 @@ public static ApplicationSubmissionContext newInstance(
boolean keepContainers, LogAggregationContext logAggregationContext) {
ApplicationSubmissionContext context =
newInstance(applicationId, applicationName, queue, priority,
- amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
- resource, applicationType, keepContainers);
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete,
+ maxAppAttempts,
+ resource, applicationType, keepContainers);
context.setLogAggregationContext(logAggregationContext);
return context;
}
+
+ @Private
+ public Credentials parseCredentials() throws IOException {
+ Credentials credentials = new Credentials();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ ByteBuffer tokens = getAMContainerSpec().getTokens();
+ if(tokens!=null) {
+ dibb.reset(tokens);
+ credentials.readTokenStorageStream(dibb);
+ tokens.rewind();
+ }
+ return credentials;
+ }
+
/**
* Get the ApplicationId of the submitted application.
* @return ApplicationId of the submitted application
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/QueueNotFoundException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/QueueNotFoundException.java
new file mode 100644
index 0000000..14873dd
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/QueueNotFoundException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.exceptions;
+
+public class QueueNotFoundException extends YarnRuntimeException {
+
+ public QueueNotFoundException(String message) {
+ super(message);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 63333b8..5cf1320 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.exceptions.QueueNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@@ -274,12 +275,11 @@ protected void submitApplication(
ApplicationId appId = submissionContext.getApplicationId();
if (UserGroupInformation.isSecurityEnabled()) {
- Credentials credentials = null;
try {
- credentials = parseCredentials(submissionContext);
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
- credentials, submissionContext.getCancelTokensWhenComplete(),
- application.getUser());
+ submissionContext.parseCredentials(),
+ submissionContext.getCancelTokensWhenComplete(),
+ application.getUser());
} catch (Exception e) {
LOG.warn("Unable to parse credentials.", e);
// Sending APP_REJECTED is fine, since we assume that the
@@ -312,31 +312,16 @@ protected void submitApplication(
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
appState.getUser());
application.recover(rmState);
- if (isApplicationInFinalState(appState.getState())) {
- // We are synchronously moving the application into final state so that
- // momentarily client will not see this application in NEW state. Also
- // for finished applications we will avoid renewing tokens.
- application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
- return;
- }
- if (UserGroupInformation.isSecurityEnabled()) {
- Credentials credentials = null;
- try {
- credentials = parseCredentials(appContext);
- // synchronously renew delegation token on recovery.
- rmContext.getDelegationTokenRenewer().addApplicationSync(appId,
- credentials, appContext.getCancelTokensWhenComplete(),
- application.getUser());
- application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
- } catch (Exception e) {
- LOG.warn("Unable to parse and renew delegation tokens.", e);
+ try {
+ application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
+ } catch (Exception e) {
+ LOG.warn("Failed to recover application.", e);
+ if (!isApplicationInFinalState(appState.getState())) {
this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppRejectedEvent(appId, e.getMessage()));
- throw e;
+ .handle(new RMAppRejectedEvent(appId, e.getMessage()));
}
- } else {
- application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
+ throw e;
}
}
@@ -426,19 +411,7 @@ private boolean isApplicationInFinalState(RMAppState rmAppState) {
}
}
- protected Credentials parseCredentials(ApplicationSubmissionContext application)
- throws IOException {
- Credentials credentials = new Credentials();
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- ByteBuffer tokens = application.getAMContainerSpec().getTokens();
- if (tokens != null) {
- dibb.reset(tokens);
- credentials.readTokenStorageStream(dibb);
- tokens.rewind();
- }
- return credentials;
- }
-
+ @SuppressWarnings("unchecked")
@Override
public void recover(RMState state) throws Exception {
RMStateStore store = rmContext.getStateStore();
@@ -447,7 +420,14 @@ public void recover(RMState state) throws Exception {
Map appStates = state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications");
for (ApplicationState appState : appStates.values()) {
- recoverApplication(appState, state);
+ try {
+ recoverApplication(appState, state);
+ } catch (QueueNotFoundException qnfe) {
+ // Trying to recover app corresponding to a missing queue
+ throw qnfe;
+ } catch (Exception e) {
+ LOG.error("Error recovering application " + appState, e);
+ }
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 1994b36..2afe678 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
@@ -228,6 +229,11 @@
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
+ .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
+ RMAppEventType.APP_REJECTED,
+ new FinalSavingTransition(new AppRejectedTransition(),
+ RMAppState.FAILED))
+
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -825,6 +831,19 @@ private void recoverAppAttempts() {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // synchronously renew delegation token on recovery.
+ try {
+ app.rmContext.getDelegationTokenRenewer().addApplicationSync(
+ app.getApplicationId(),
+ app.submissionContext.parseCredentials(),
+ app.submissionContext.getCancelTokensWhenComplete(),
+ app.getUser());
+ } catch (Exception e) {
+ return RMAppState.FAILED;
+ }
+ }
+
// The app has completed.
if (app.recoveredFinalState != null) {
app.recoverAppAttempts();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index b5a6237..ae11b07 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -833,8 +833,10 @@ private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
if (UserGroupInformation.isSecurityEnabled()) {
byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey(
RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME);
- clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager()
- .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
+ if (clientTokenMasterKeyBytes != null) {
+ clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager()
+ .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
+ }
}
this.amrmToken =
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 9332228..5f1f645 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -57,6 +57,7 @@
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.QueueNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -676,15 +677,13 @@ private synchronized void addApplication(ApplicationId applicationId,
//During a restart, this indicates a queue was removed, which is
//not presently supported
if (isAppRecovering) {
- //throwing RuntimeException because some other exceptions are caught
- //(including YarnRuntimeException) and we want this to force an exit
- String queueErrorMsg = "Queue named " + queueName
+ String queueErrorMsg = "Queue named " + queueName
+ " missing during application recovery."
+ " Queue removal during recovery is not presently supported by the"
+ " capacity scheduler, please restart with all queues configured"
+ " which were present before shutdown/restart.";
LOG.fatal(queueErrorMsg);
- throw new RuntimeException(queueErrorMsg);
+ throw new QueueNotFoundException(queueErrorMsg);
}
String message = "Application " + applicationId +
" submitted by user " + user + " to unknown queue: " + queueName;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 9d0ac27..9f127fb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -354,14 +354,6 @@ public RMApp submitApp(int masterMemory, String name, String user,
LogAggregationContext logAggregationContext)
throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
- ApplicationClientProtocol client = getClientRMService();
- if (! isAppIdProvided) {
- GetNewApplicationResponse resp = client.getNewApplication(Records
- .newRecord(GetNewApplicationRequest.class));
- appId = resp.getApplicationId();
- }
- SubmitApplicationRequest req = Records
- .newRecord(SubmitApplicationRequest.class);
ApplicationSubmissionContext sub = Records
.newRecord(ApplicationSubmissionContext.class);
sub.setKeepContainersAcrossApplicationAttempts(keepContainers);
@@ -392,31 +384,47 @@ public RMApp submitApp(int masterMemory, String name, String user,
if (logAggregationContext != null) {
sub.setLogAggregationContext(logAggregationContext);
}
+
+ return submitApp(user, sub, waitForAccepted);
+ }
+
+ public RMApp submitApp(String user, ApplicationSubmissionContext sub,
+ boolean waitForAccepted) throws Exception {
+ ApplicationClientProtocol client = getClientRMService();
+ if (sub.getApplicationId() == null) {
+ GetNewApplicationResponse resp = client.getNewApplication(Records
+ .newRecord(GetNewApplicationRequest.class));
+ sub.setApplicationId(resp.getApplicationId());
+ }
+ ApplicationId appId = sub.getApplicationId();
+
+ SubmitApplicationRequest req = Records
+ .newRecord(SubmitApplicationRequest.class);
req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser =
- UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
+ UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
PrivilegedAction action =
- new PrivilegedAction() {
- ApplicationClientProtocol client;
- SubmitApplicationRequest req;
- @Override
- public SubmitApplicationResponse run() {
- try {
- return client.submitApplication(req);
- } catch (YarnException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return null;
- }
- PrivilegedAction setClientReq(
- ApplicationClientProtocol client, SubmitApplicationRequest req) {
- this.client = client;
- this.req = req;
- return this;
- }
- }.setClientReq(client, req);
+ new PrivilegedAction() {
+ ApplicationClientProtocol client;
+ SubmitApplicationRequest req;
+ @Override
+ public SubmitApplicationResponse run() {
+ try {
+ return client.submitApplication(req);
+ } catch (YarnException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ PrivilegedAction setClientReq(
+ ApplicationClientProtocol client, SubmitApplicationRequest req) {
+ this.client = client;
+ this.req = req;
+ return this;
+ }
+ }.setClientReq(client, req);
fakeUser.doAs(action);
// make sure app is immediately available after submit
if (waitForAccepted) {
@@ -425,7 +433,7 @@ public SubmitApplicationResponse run() {
RMApp rmApp = getRMContext().getRMApps().get(appId);
// unmanaged AM won't go to RMAppAttemptState.SCHEDULED.
- if (waitForAccepted && !unmanaged) {
+ if (waitForAccepted && !sub.getUnmanagedAM()) {
waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.SCHEDULED);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAppManager.java
new file mode 100644
index 0000000..7c4ab8d
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAppManager.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.QueueNotFoundException;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestRMAppManager {
+ private MockRM rm;
+ private RMAppManager appManager;
+
+ @Test (expected = QueueNotFoundException.class)
+ public void testRecoveryFailureWithHA() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+ conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
+ conf.set(YarnConfiguration.RM_HA_ID, "rm1");
+ conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"), "0.0.0.0");
+ conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"), "0.0.0.0");
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ for (int i = 2; i > 0; i--) {
+ ApplicationState appState = mock(ApplicationState.class);
+ when(appState.getAppId()).thenReturn(ApplicationId.newInstance(1234, i));
+ memStore.getState().getApplicationState()
+ .put(appState.getAppId(), appState);
+ }
+
+ rm = new MockRM(conf, memStore);
+ rm.start();
+ rm.transitionToActive();
+
+ appManager = rm.getRMAppManager();
+ RMAppManager appManagerSpy = spy(appManager);
+
+ // Check recovery behavior on Exceptions
+ doThrow(new InvalidBlockTokenException()).when(appManagerSpy).
+ recoverApplication(
+ any(ApplicationState.class), any(RMStateStore.RMState.class));
+ appManagerSpy.recover(memStore.getState());
+ assertEquals("RM is not active any more",
+ HAServiceProtocol.HAServiceState.ACTIVE,
+ rm.getRMContext().getHAServiceState());
+
+ // Check recovery behavior on ConnectException
+ doThrow(new QueueNotFoundException("Missing queue"))
+ .when(appManagerSpy).recoverApplication(
+ any(ApplicationState.class), any(RMStateStore.RMState.class));
+ appManagerSpy.recover(memStore.getState());
+ }
+
+ @Test (expected = QueueNotFoundException.class)
+ public void testRecoveryFailure() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ for (int i = 2; i > 0; i--) {
+ ApplicationState appState = mock(ApplicationState.class);
+ when(appState.getAppId()).thenReturn(ApplicationId.newInstance(1234, i));
+ memStore.getState().getApplicationState()
+ .put(appState.getAppId(), appState);
+ }
+
+ rm = new MockRM(conf, memStore);
+ rm.start();
+ rm.transitionToActive();
+
+ appManager = rm.getRMAppManager();
+ RMAppManager appManagerSpy = spy(appManager);
+
+ // Check recovery behavior on Exceptions
+ doThrow(new InvalidBlockTokenException()).when(appManagerSpy).
+ recoverApplication(
+ any(ApplicationState.class), any(RMStateStore.RMState.class));
+ appManagerSpy.recover(memStore.getState());
+ assertEquals("RM is not running any more",
+ Service.STATE.STARTED, rm.getServiceState());
+
+ // Check recovery behavior on ConnectException
+ doThrow(new QueueNotFoundException("Missing queue"))
+ .when(appManagerSpy).recoverApplication(
+ any(ApplicationState.class), any(RMStateStore.RMState.class));
+ appManagerSpy.recover(memStore.getState());
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 9502eba..2701fba 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -67,6 +68,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -99,6 +101,7 @@
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -1611,33 +1614,21 @@ public void testAppFailedOnSubmissionSavedInStateStore() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
- MockRM rm1 = new TestSecurityMockRM(conf, memStore) {
- @Override
- protected RMAppManager createRMAppManager() {
- return new TestRMAppManager(this.rmContext, this.scheduler,
- this.masterService, this.applicationACLsManager, conf);
- }
+ MockRM rm1 = new TestSecurityMockRM(conf, memStore);
+ rm1.start();
- class TestRMAppManager extends RMAppManager {
+ ApplicationSubmissionContext asc = ApplicationSubmissionContext
+ .newInstance(
+ ApplicationId.newInstance(0, 1), "app", "queue",
+ Priority.newInstance(1), mock(ContainerLaunchContext.class),
+ false, false, 2, Resources.createResource(256));
- public TestRMAppManager(RMContext context, YarnScheduler scheduler,
- ApplicationMasterService masterService,
- ApplicationACLsManager applicationACLsManager, Configuration conf) {
- super(context, scheduler, masterService, applicationACLsManager, conf);
- }
+ ApplicationSubmissionContext context = spy(asc);
+ doThrow(new IOException("Parsing credential error.")).
+ when(context).parseCredentials();
+
+ RMApp app1 = rm1.submitApp("user", context, false);
- @Override
- protected Credentials parseCredentials(
- ApplicationSubmissionContext application) throws IOException {
- throw new IOException("Parsing credential error.");
- }
- }
- };
- rm1.start();
- RMApp app1 =
- rm1.submitApp(200, "name", "user",
- new HashMap(), false, "default", -1,
- null, "MAPREDUCE", false);
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
// Check app staet is saved in state store.
Assert.assertEquals(RMAppState.FAILED, memStore.getState()
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 85d3895..8157f52 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -48,6 +49,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.QueueNotFoundException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
@@ -570,10 +572,10 @@ public void testCapacitySchedulerRecovery() throws Exception {
// submission
//2. Remove one of the queues, restart the RM
//3. Verify that the expected exception was thrown
- @Test (timeout = 30000)
+ @Test (timeout = 30000, expected = QueueNotFoundException.class)
public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
if (!schedulerClass.equals(CapacityScheduler.class)) {
- return;
+ throw new QueueNotFoundException("Dummy");
}
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
@@ -614,17 +616,7 @@ public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationOnlyA(csConf);
rm2 = new MockRM(csConf, memStore);
- boolean runtimeThrown = false;
- try {
- rm2.start();
- } catch (RuntimeException e) {
- //we're catching it because we want to verify the message
- //and we don't want to set it as an expected exception for the
- //test because we only want it to happen here
- assertTrue(e.getMessage().contains(B + " missing"));
- runtimeThrown = true;
- }
- assertTrue(runtimeThrown);
+ rm2.start();
}
private void checkParentQueue(ParentQueue parentQueue, int numContainers,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index 6a66385..265815a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -35,6 +35,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -73,6 +74,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+
+import org.apache.hadoop.yarn.server.resourcemanager.security
+ .DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -199,10 +203,11 @@ public void setUp() throws Exception {
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
store = mock(RMStateStore.class);
writer = mock(RMApplicationHistoryWriter.class);
- RMContext realRMContext =
+ DelegationTokenRenewer renewer = mock(DelegationTokenRenewer.class);
+ RMContext realRMContext =
new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
- null, new AMRMTokenSecretManager(conf, this.rmContext),
+ renewer, new AMRMTokenSecretManager(conf, this.rmContext),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(),
@@ -248,7 +253,12 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext)
new ApplicationMasterService(rmContext, scheduler);
if(submissionContext == null) {
- submissionContext = new ApplicationSubmissionContextPBImpl();
+ submissionContext = new ApplicationSubmissionContextPBImpl() {
+ @Override
+ public Credentials parseCredentials() {
+ return null;
+ }
+ };
}
// applicationId will not be used because RMStateStore is mocked,
// but applicationId is still set for safety