diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
index d892c5d..370744a 100644
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.security.token.delegation;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
@@ -295,7 +296,7 @@ private synchronized void removeExpiredKeys() {
* if the token is expired. Note that this method should be called with
* acquiring the secret manager's monitor.
*/
- protected DelegationTokenInformation checkToken(TokenIdent identifier)
+ public DelegationTokenInformation checkToken(TokenIdent identifier)
throws InvalidToken {
assert Thread.holdsLock(this);
DelegationTokenInformation info = currentTokens.get(identifier);
@@ -308,7 +309,7 @@ protected DelegationTokenInformation checkToken(TokenIdent identifier)
}
return info;
}
-
+
@Override
public synchronized byte[] retrievePassword(TokenIdent identifier)
throws InvalidToken {
@@ -330,6 +331,20 @@ public synchronized String getTokenTrackingId(TokenIdent identifier) {
return info.getTrackingId();
}
+
+ /**
+ * Find the DelegationTokenInformation for the given token id
+ * if the token is expired, return null
+ */
+ @Private
+ public synchronized DelegationTokenInformation getToken(TokenIdent identifier) {
+ DelegationTokenInformation info = currentTokens.get(identifier);
+ if (info == null || info.getRenewDate() < Time.now()) {
+ return null;
+ }
+ return info;
+ }
+
/**
* Verifies that the given identifier and password are valid and match.
* @param identifier Token identifier.
@@ -469,7 +484,7 @@ public long getRenewDate() {
return renewDate;
}
/** returns password */
- byte[] getPassword() {
+ public byte[] getPassword() {
return password;
}
/** returns tracking id */
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
index 8db9151..4951a1e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
@@ -104,6 +104,7 @@
*/
@Public
@Stable
+ @Idempotent
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request)
throws YarnException, IOException;
@@ -133,6 +134,10 @@ public GetNewApplicationResponse getNewApplication(
* it encounters the {@link ApplicationNotFoundException} on the
* {@link #getApplicationReport(GetApplicationReportRequest)} call.
*
+ * During the submission process, it checks whether the application
+ * has already exist. If the application exists, it will simply return
+ * SubmitApplicationResponse
+ *
* In secure mode,the ResourceManager verifies access to
* queues etc. before accepting the application submission.
*
@@ -147,6 +152,7 @@ public GetNewApplicationResponse getNewApplication(
*/
@Public
@Stable
+ @Idempotent
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request)
throws YarnException, IOException;
@@ -173,6 +179,7 @@ public SubmitApplicationResponse submitApplication(
*/
@Public
@Stable
+ @Idempotent
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request)
throws YarnException, IOException;
@@ -231,6 +238,7 @@ public GetApplicationReportResponse getApplicationReport(
*/
@Public
@Stable
+ @Idempotent
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request)
throws YarnException, IOException;
@@ -258,6 +266,7 @@ public GetClusterMetricsResponse getClusterMetrics(
*/
@Public
@Stable
+ @Idempotent
public GetApplicationsResponse getApplications(
GetApplicationsRequest request)
throws YarnException, IOException;
@@ -277,6 +286,7 @@ public GetApplicationsResponse getApplications(
*/
@Public
@Stable
+ @Idempotent
public GetClusterNodesResponse getClusterNodes(
GetClusterNodesRequest request)
throws YarnException, IOException;
@@ -298,6 +308,7 @@ public GetClusterNodesResponse getClusterNodes(
*/
@Public
@Stable
+ @Idempotent
public GetQueueInfoResponse getQueueInfo(
GetQueueInfoRequest request)
throws YarnException, IOException;
@@ -317,6 +328,7 @@ public GetQueueInfoResponse getQueueInfo(
*/
@Public
@Stable
+ @Idempotent
public GetQueueUserAclsInfoResponse getQueueUserAcls(
GetQueueUserAclsInfoRequest request)
throws YarnException, IOException;
@@ -328,6 +340,13 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls(
* The ResourceManager responds with the delegation
* {@link Token} that can be used by the client to speak to this
* service.
+ *
+ *
It will also check whether the token has already exist
+ * before creating a new token.
+ * If the token exists, the token will be recovered, and return
+ * back to the client
+ * If the token does not exist or the token is expired, it will create
+ * a new token back to the client
* @param request request to get a delegation token for the client.
* @return delegation token that can be used to talk to this service
* @throws YarnException
@@ -335,6 +354,7 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls(
*/
@Public
@Stable
+ @Idempotent
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request)
throws YarnException, IOException;
@@ -349,6 +369,7 @@ public GetDelegationTokenResponse getDelegationToken(
*/
@Private
@Unstable
+ @Idempotent
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException,
IOException;
@@ -363,6 +384,7 @@ public RenewDelegationTokenResponse renewDelegationToken(
*/
@Private
@Unstable
+ @Idempotent
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnException,
IOException;
@@ -377,6 +399,7 @@ public CancelDelegationTokenResponse cancelDelegationToken(
*/
@Public
@Unstable
+ @Idempotent
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request) throws YarnException, IOException;
@@ -422,6 +445,7 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
*/
@Public
@Unstable
+ @Idempotent
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
GetApplicationAttemptReportRequest request) throws YarnException,
IOException;
@@ -453,6 +477,7 @@ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
*/
@Public
@Unstable
+ @Idempotent
public GetApplicationAttemptsResponse getApplicationAttempts(
GetApplicationAttemptsRequest request) throws YarnException, IOException;
@@ -486,6 +511,7 @@ public GetApplicationAttemptsResponse getApplicationAttempts(
*/
@Public
@Unstable
+ @Idempotent
public GetContainerReportResponse getContainerReport(
GetContainerReportRequest request) throws YarnException, IOException;
@@ -520,6 +546,7 @@ public GetContainerReportResponse getContainerReport(
*/
@Public
@Unstable
+ @Idempotent
public GetContainersResponse getContainers(GetContainersRequest request)
throws YarnException, IOException;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
index 2641599..4b777ea 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -50,16 +51,19 @@
@Public
@Stable
+ @Idempotent
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws StandbyException, YarnException, IOException;
@Public
@Stable
+ @Idempotent
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws StandbyException, YarnException, IOException;
@Public
@Stable
+ @Idempotent
public RefreshSuperUserGroupsConfigurationResponse
refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
@@ -67,18 +71,21 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
@Public
@Stable
+ @Idempotent
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
throws StandbyException, YarnException, IOException;
@Public
@Stable
+ @Idempotent
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request)
throws YarnException, IOException;
@Public
@Stable
+ @Idempotent
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request)
throws YarnException, IOException;
@@ -99,6 +106,7 @@ public RefreshServiceAclsResponse refreshServiceAcls(
*/
@Public
@Evolving
+ @Idempotent
public UpdateNodeResourceResponse updateNodeResource(
UpdateNodeResourceRequest request)
throws YarnException, IOException;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java
new file mode 100644
index 0000000..29427e6
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestApplicationClientProtocolOnHA extends TestProtocolHA {
+ private YarnClient client = null;
+
+ @Before
+ public void initiate() throws Exception {
+ startHACluster(1, true, false);
+ Configuration conf = new YarnConfiguration(this.conf);
+ client = createAndStartYarnClient(conf);
+ }
+
+ @After
+ public void shutDown() {
+ if (client != null) {
+ client.stop();
+ }
+ }
+
+ @Test(timeout = 15000)
+ public void testGetApplicationReportOnHA() throws Exception {
+ ApplicationReport report =
+ client.getApplicationReport(cluster.createFakeAppId());
+ Assert.assertTrue(report != null);
+ Assert.assertEquals(cluster.createFakeAppReport(), report);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetNewApplicationOnHA() throws Exception {
+ ApplicationId appId =
+ client.createApplication().getApplicationSubmissionContext()
+ .getApplicationId();
+ Assert.assertTrue(appId != null);
+ Assert.assertEquals(cluster.createFakeAppId(), appId);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetClusterMetricsOnHA() throws Exception {
+ YarnClusterMetrics clusterMetrics =
+ client.getYarnClusterMetrics();
+ Assert.assertTrue(clusterMetrics != null);
+ Assert.assertEquals(cluster.createFakeYarnClusterMetrics(),
+ clusterMetrics);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetApplicationsOnHA() throws Exception {
+ List reports =
+ client.getApplications();
+ Assert.assertTrue(reports != null && !reports.isEmpty());
+ Assert.assertEquals(cluster.createFakeAppReports(),
+ reports);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetClusterNodesOnHA() throws Exception {
+ List reports = client.getNodeReports(NodeState.RUNNING);
+ Assert.assertTrue(reports != null && !reports.isEmpty());
+ Assert.assertEquals(cluster.createFakeNodeReports(),
+ reports);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetQueueInfoOnHA() throws Exception {
+ QueueInfo queueInfo = client.getQueueInfo("root");
+ Assert.assertTrue(queueInfo != null);
+ Assert.assertEquals(cluster.createFakeQueueInfo(),
+ queueInfo);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetQueueUserAclsOnHA() throws Exception {
+ List queueUserAclsList = client.getQueueAclsInfo();
+ Assert.assertTrue(queueUserAclsList != null
+ && !queueUserAclsList.isEmpty());
+ Assert.assertEquals(cluster.createFakeQueueUserACLInfoList(),
+ queueUserAclsList);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetApplicationAttemptReportOnHA() throws Exception {
+ ApplicationAttemptReport report =
+ client.getApplicationAttemptReport(cluster
+ .createFakeApplicationAttemptId());
+ Assert.assertTrue(report != null);
+ Assert.assertEquals(cluster.createFakeApplicationAttemptReport(), report);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetApplicationAttemptsOnHA() throws Exception {
+ List reports =
+ client.getApplicationAttempts(cluster.createFakeAppId());
+ Assert.assertTrue(reports != null && !reports.isEmpty());
+ Assert.assertEquals(cluster.createFakeApplicationAttemptReports(),
+ reports);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetContainerReportOnHA() throws Exception {
+ ContainerReport report =
+ client.getContainerReport(cluster.createFakeContainerId());
+ Assert.assertTrue(report != null);
+ Assert.assertEquals(cluster.createFakeContainerReport(), report);
+ }
+
+ @Test(timeout = 15000)
+ public void testGetContainersOnHA() throws Exception {
+ List reports =
+ client.getContainers(cluster.createFakeApplicationAttemptId());
+ Assert.assertTrue(reports != null && !reports.isEmpty());
+ Assert.assertEquals(cluster.createFakeContainerReports(),
+ reports);
+ }
+
+ @Test(timeout = 15000)
+ public void testSubmitApplicationOnHA() throws Exception {
+ ApplicationSubmissionContext appContext =
+ Records.newRecord(ApplicationSubmissionContext.class);
+ appContext.setApplicationId(cluster.createFakeAppId());
+ ContainerLaunchContext amContainer =
+ Records.newRecord(ContainerLaunchContext.class);
+ appContext.setAMContainerSpec(amContainer);
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(10);
+ capability.setVirtualCores(1);
+ appContext.setResource(capability);
+ ApplicationId appId = client.submitApplication(appContext);
+ Assert.assertTrue(getActiveRM().getRMContext().getRMApps()
+ .containsKey(appId));
+ }
+
+ @Test(timeout = 15000)
+ public void testMoveApplicationAcrossQueuesOnHA() throws Exception{
+ client.moveApplicationAcrossQueues(cluster.createFakeAppId(), "root");
+ }
+
+ @Test(timeout = 15000)
+ public void testForceKillApplicationOnHA() throws Exception {
+ client.killApplication(cluster.createFakeAppId());
+ }
+
+ @Test(timeout = 15000)
+ public void testGetDelegationTokenOnHA() throws Exception {
+ Token token = client.getRMDelegationToken(new Text(" "));
+ Assert.assertEquals(token, cluster.createFakeToken());
+ }
+
+ @Test(timeout = 15000)
+ public void testRenewDelegationTokenOnHA() throws Exception {
+ RenewDelegationTokenRequest request =
+ RenewDelegationTokenRequest.newInstance(cluster.createFakeToken());
+ long newExpirationTime =
+ ClientRMProxy.createRMProxy(this.conf, ApplicationClientProtocol.class)
+ .renewDelegationToken(request).getNextExpirationTime();
+ Assert.assertEquals(newExpirationTime, cluster.createNextExpirationTime());
+ }
+
+ @Test(timeout = 15000)
+ public void testCancelDelegationTokenOnHA() throws Exception {
+ CancelDelegationTokenRequest request =
+ CancelDelegationTokenRequest.newInstance(cluster.createFakeToken());
+ ClientRMProxy.createRMProxy(this.conf, ApplicationClientProtocol.class)
+ .cancelDelegationToken(request);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestProtocolHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestProtocolHA.java
new file mode 100644
index 0000000..f659911
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestProtocolHA.java
@@ -0,0 +1,188 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.junit.After;
+import org.junit.Before;
+
+
+public abstract class TestProtocolHA extends ClientBaseWithFixes{
+ protected static final HAServiceProtocol.StateChangeRequestInfo req =
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+ protected static final String RM1_NODE_ID = "rm1";
+ protected static final int RM1_PORT_BASE = 10000;
+ protected static final String RM2_NODE_ID = "rm2";
+ protected static final int RM2_PORT_BASE = 20000;
+
+ protected Configuration conf;
+ protected MiniYARNCluster cluster;
+
+ protected Thread failoverThread = null;
+
+ private void setConfForRM(String rmId, String prefix, String value) {
+ conf.set(HAUtil.addSuffix(prefix, rmId), value);
+ }
+
+ private void setRpcAddressForRM(String rmId, int base) {
+ setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT));
+ setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" +
+ (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT));
+ }
+
+ @Before
+ public void setup() throws IOException {
+ failoverThread = null;
+ conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+ conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
+ setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE);
+ setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
+
+ conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
+
+ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (failoverThread != null) {
+ failoverThread.join();
+ }
+ cluster.stop();
+ }
+
+ protected AdminService getAdminService(int index) {
+ return cluster.getResourceManager(index).getRMContext().getRMAdminService();
+ }
+
+ protected void explicitFailover() throws IOException {
+ int activeRMIndex = cluster.getActiveRMIndex();
+ int newActiveRMIndex = (activeRMIndex + 1) % 2;
+ getAdminService(activeRMIndex).transitionToStandby(req);
+ getAdminService(newActiveRMIndex).transitionToActive(req);
+ assertEquals("Failover failed", newActiveRMIndex, cluster.getActiveRMIndex());
+ }
+
+ protected YarnClient createAndStartYarnClient(Configuration conf) {
+ Configuration configuration = new YarnConfiguration(conf);
+ YarnClient client = YarnClient.createYarnClient();
+ client.init(configuration);
+ client.start();
+ return client;
+ }
+
+ protected void verifyConnections() throws InterruptedException, YarnException {
+ assertTrue("NMs failed to connect to the RM",
+ cluster.waitForNodeManagersToConnect(20000));
+ verifyClientConnection();
+ }
+
+ protected void verifyClientConnection() {
+ int numRetries = 3;
+ while(numRetries-- > 0) {
+ Configuration conf = new YarnConfiguration(this.conf);
+ YarnClient client = createAndStartYarnClient(conf);
+ try {
+ client.getApplications();
+ return;
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ } finally {
+ client.stop();
+ }
+ }
+ fail("Client couldn't connect to the Active RM");
+ }
+
+ protected Thread createAndStartFailoverThread() {
+ Thread failoverThread = new Thread() {
+ boolean keepRunning = true;
+ public void run() {
+ while (keepRunning) {
+ if (MiniYARNCluster.startFailover.get()) {
+ try {
+ explicitFailover();
+ keepRunning = false;
+ } catch (Exception e) {
+ // Do Nothing
+ } finally {
+ keepRunning = false;
+ }
+ }
+ }
+ }
+ };
+ failoverThread.start();
+ return failoverThread;
+ }
+
+ protected void startHACluster(int numOfNMs, boolean overrideClientRMService,
+ boolean overrideRTS) throws Exception {
+ conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+ cluster =
+ new MiniYARNCluster(TestRMFailover.class.getName(), 2, numOfNMs, 1, 1, false,
+ overrideClientRMService, overrideRTS);
+ cluster.resetStartFailoverFlag(false);
+ cluster.init(conf);
+ cluster.start();
+ getAdminService(0).transitionToActive(req);
+ assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+ verifyConnections();
+
+ // Do the failover
+ explicitFailover();
+ verifyConnections();
+
+ failoverThread = createAndStartFailoverThread();
+
+ }
+
+ protected ResourceManager getActiveRM() {
+ return cluster.getResourceManager(cluster.getActiveRMIndex());
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
new file mode 100644
index 0000000..3387d46
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
@@ -0,0 +1,91 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.ServerRMProxy;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.util.YarnVersionInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestResourceTrackerOnHA extends TestProtocolHA{
+
+ private ResourceTracker resourceTracker = null;
+
+ @Before
+ public void initiate() throws Exception {
+ startHACluster(0, false, true);
+ this.resourceTracker = getRMClient();
+ }
+
+ @After
+ public void shutDown() {
+ if(this.resourceTracker != null) {
+ RPC.stopProxy(this.resourceTracker);
+ }
+ }
+
+ @Test(timeout = 15000)
+ public void testResourceTrackerOnHA() throws Exception {
+ NodeId nodeId = NodeId.newInstance("localhost", 0);
+ Resource resource = Resource.newInstance(2048, 4);
+
+ // make sure registerNodeManager works when failover happens
+ RegisterNodeManagerRequest request =
+ RegisterNodeManagerRequest.newInstance(nodeId, 0, resource,
+ YarnVersionInfo.getVersion(), null);
+ resourceTracker.registerNodeManager(request);
+ Assert.assertTrue(waitForNodeManagerToConnect(2000, nodeId));
+
+ // restart the failover thread, and make sure nodeHeartbeat works
+ failoverThread = createAndStartFailoverThread();
+ NodeStatus status =
+ NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
+ null, null);
+ NodeHeartbeatRequest request2 =
+ NodeHeartbeatRequest.newInstance(status, null, null);
+ resourceTracker.nodeHeartbeat(request2);
+ }
+
+ private ResourceTracker getRMClient() throws IOException {
+ return ServerRMProxy.createRMProxy(this.conf, ResourceTracker.class);
+ }
+
+ private boolean waitForNodeManagerToConnect(int timeout, NodeId nodeId)
+ throws Exception {
+ for (int i = 0; i < timeout / 100; i++) {
+ if (getActiveRM().getRMContext().getRMNodes().containsKey(nodeId)) {
+ return true;
+ }
+ Thread.sleep(100);
+ }
+ return false;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java
index 56cc317..ad8a625 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java
@@ -19,6 +19,8 @@
import java.io.IOException;
+import org.apache.hadoop.io.retry.AtMostOnce;
+import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -27,10 +29,12 @@
public interface ResourceTracker {
+ @Idempotent
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException;
+ @AtMostOnce
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 7a7ff94..a1ab246 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
@@ -505,16 +506,12 @@ public SubmitApplicationResponse submitApplication(
throw RPCUtil.getRemoteException(ie);
}
- // Though duplication will checked again when app is put into rmContext,
- // but it is good to fail the invalid submission as early as possible.
+ // Check whether app has already been put into rmContext,
+ // If it is, simply return the response
if (rmContext.getRMApps().get(applicationId) != null) {
- String message = "Application with id " + applicationId +
- " is already present! Cannot add a duplicate!";
- LOG.warn(message);
- RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
- message, "ClientRMService", "Exception in submitting application",
- applicationId);
- throw RPCUtil.getRemoteException(message);
+ RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
+ "ClientRMService", applicationId);
+ return SubmitApplicationResponse.newInstance();
}
if (submissionContext.getQueue() == null) {
@@ -868,18 +865,33 @@ public GetDelegationTokenResponse getDelegationToken(
realUser = new Text(ugi.getRealUser().getUserName());
}
RMDelegationTokenIdentifier tokenIdentifier =
- new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()),
- realUser);
- Token realRMDTtoken =
- new Token(tokenIdentifier,
- this.rmDTSecretManager);
- response.setRMDelegationToken(
- BuilderUtils.newDelegationToken(
- realRMDTtoken.getIdentifier(),
- realRMDTtoken.getKind().toString(),
- realRMDTtoken.getPassword(),
- realRMDTtoken.getService().toString()
- ));
+ new RMDelegationTokenIdentifier(owner, new Text(
+ request.getRenewer()), realUser);
+ // check whether the token exists or not.
+ // If this token existed, recover the token with
+ // DelegationTokenInformation
+ DelegationTokenInformation dtInfo =
+ this.rmDTSecretManager.getToken(tokenIdentifier);
+ if (dtInfo != null) {
+ response.setRMDelegationToken(
+ BuilderUtils.newDelegationToken(
+ tokenIdentifier.getBytes(),
+ tokenIdentifier.getKind().toString(),
+ dtInfo.getPassword(),
+ (new Text()).toString()
+ ));
+ } else {
+ Token realRMDTtoken =
+ new Token(tokenIdentifier,
+ this.rmDTSecretManager);
+ response.setRMDelegationToken(
+ BuilderUtils.newDelegationToken(
+ realRMDTtoken.getIdentifier(),
+ realRMDTtoken.getKind().toString(),
+ realRMDTtoken.getPassword(),
+ realRMDTtoken.getService().toString()
+ ));
+ }
return response;
} catch(IOException io) {
throw RPCUtil.getRemoteException(io);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 30af77d..354c977 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -718,7 +718,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
}
// TODO: Write out change to state store (YARN-1558)
-
+ // Also take care of RM failover
moveEvent.getResult().set(null);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 852a0e1..13ab17c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -589,10 +589,8 @@ public void handle(Event event) {}
// duplicate appId
try {
rmService.submitApplication(submitRequest2);
- Assert.fail("Exception is expected.");
} catch (YarnException e) {
- Assert.assertTrue("The thrown exception is not expected.",
- e.getMessage().contains("Cannot add a duplicate!"));
+ Assert.fail("Exception is not expected.");
}
GetApplicationsRequest getAllAppsRequest =
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
index 484ba89..e434e47 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java
@@ -219,4 +219,77 @@ public void testGetApplicationReportIdempotent() throws Exception{
Assert.assertEquals(appReport3.getYarnApplicationState(),
appReport4.getYarnApplicationState());
}
+
+ // There are two scenarios when RM failover happens
+ // during SubmitApplication Call:
+ // 1) RMStateStore already saved the ApplicationState when failover happens
+ // 2) RMStateStore did not save the ApplicationState when failover happens
+ @Test (timeout = 5000)
+ public void
+ testHandleRMHADuringSubmitApplicationCallWithSavedApplicationState()
+ throws Exception {
+ // Test scenario 1 when RM failover happens
+ // druing SubmitApplication Call:
+ // RMStateStore already saved the ApplicationState when failover happens
+ startRMs();
+
+ // Submit Application
+ // After submission, the applicationState will be saved in RMStateStore.
+ RMApp app0 = rm1.submitApp(200);
+
+ // Do the failover
+ explicitFailover();
+
+ // Since the applicationState has already been saved in RMStateStore
+ // before failover happens, the current active rm can load the previous
+ // applicationState.
+ // When we re-submit the application with same applicationId, it will
+ // check whether this application has been exist. If yes, just simply
+ // return submitApplicationResponse.
+ RMApp app1 =
+ rm2.submitApp(200, "", UserGroupInformation
+ .getCurrentUser().getShortUserName(), null, false, null,
+ configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+ false, false, true, app0.getApplicationId());
+
+ Assert.assertEquals(app1.getApplicationId(), app0.getApplicationId());
+ }
+
+ @Test (timeout = 5000)
+ public void
+ testHandleRMHADuringSubmitApplicationCallWithoutSavedApplicationState()
+ throws Exception {
+ // Test scenario 2 when RM failover happens
+ // during SubmitApplication Call:
+ // RMStateStore did not save the ApplicationState when failover happens.
+ // Using customized RMAppManager.
+ startRMsWithCustomizedRMAppManager();
+
+ // Submit Application
+ // After submission, the applicationState will
+ // not be saved in RMStateStore
+ RMApp app0 =
+ rm1.submitApp(200, "", UserGroupInformation
+ .getCurrentUser().getShortUserName(), null, false, null,
+ configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+ false, false);
+
+ // Do the failover
+ explicitFailover();
+
+ // Submit the application with previous ApplicationId to current active RM
+ // This will mimic the similar behavior of ApplicationClientProtocol#
+ // submitApplication() when failover happens during the submission process
+ // because the submitApplication api is marked as idempotent
+ RMApp app1 =
+ rm2.submitApp(200, "", UserGroupInformation
+ .getCurrentUser().getShortUserName(), null, false, null,
+ configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
+ false, false, true, app0.getApplicationId());
+
+ verifySubmitApp(rm2, app1, app0.getApplicationId());
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 0adbf65..15ae839 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -22,10 +22,12 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -39,8 +41,59 @@
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -64,12 +117,24 @@
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -94,6 +159,7 @@
public class MiniYARNCluster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MiniYARNCluster.class);
+ public final static AtomicBoolean startFailover = new AtomicBoolean(false);
// temp fix until metrics system can auto-detect itself running in unit test:
static {
@@ -120,6 +186,8 @@
// Number of nm-log-dirs per nodemanager
private int numLogDirs;
private boolean enableAHS;
+ private boolean overrideClientRMService;
+ private boolean overrideRTS;
/**
* @param testName name of the test
@@ -128,14 +196,19 @@
* @param numLocalDirs the number of nm-local-dirs per nodemanager
* @param numLogDirs the number of nm-log-dirs per nodemanager
* @param enableAHS enable ApplicationHistoryServer or not
+ * @param overrideClientRMService override ClientRMService or not
+ * @param overrideRTS override ResourceTrackerService or not
*/
public MiniYARNCluster(
String testName, int numResourceManagers, int numNodeManagers,
- int numLocalDirs, int numLogDirs, boolean enableAHS) {
+ int numLocalDirs, int numLogDirs, boolean enableAHS,
+ boolean overrideClientRMService, boolean overrideRTS) {
super(testName.replace("$", ""));
this.numLocalDirs = numLocalDirs;
this.numLogDirs = numLogDirs;
this.enableAHS = enableAHS;
+ this.overrideClientRMService = overrideClientRMService;
+ this.overrideRTS = overrideRTS;
String testSubDir = testName.replace("$", "");
File targetWorkDir = new File("target", testSubDir);
try {
@@ -191,12 +264,26 @@ public MiniYARNCluster(
* @param numNodeManagers the number of node managers in the cluster
* @param numLocalDirs the number of nm-local-dirs per nodemanager
* @param numLogDirs the number of nm-log-dirs per nodemanager
+ * @param enableAHS enable ApplicationHistoryServer or not
+ */
+ public MiniYARNCluster(
+ String testName, int numResourceManagers, int numNodeManagers,
+ int numLocalDirs, int numLogDirs, boolean enableAHS) {
+ this(testName, numResourceManagers, numNodeManagers, numLocalDirs,
+ numLogDirs, enableAHS, false, false);
+ }
+ /**
+ * @param testName name of the test
+ * @param numResourceManagers the number of resource managers in the cluster
+ * @param numNodeManagers the number of node managers in the cluster
+ * @param numLocalDirs the number of nm-local-dirs per nodemanager
+ * @param numLogDirs the number of nm-log-dirs per nodemanager
*/
public MiniYARNCluster(
String testName, int numResourceManagers, int numNodeManagers,
int numLocalDirs, int numLogDirs) {
this(testName, numResourceManagers, numNodeManagers, numLocalDirs,
- numLogDirs, false);
+ numLogDirs, false, false, false);
}
/**
@@ -248,6 +335,26 @@ public void serviceInit(Configuration conf) throws Exception {
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcases.
}
+ @Override
+ protected ClientRMService createClientRMService() {
+ if (overrideClientRMService) {
+ return new CustomedClientRMService(this.rmContext, this.scheduler,
+ this.rmAppManager, this.applicationACLsManager,
+ this.queueACLsManager,
+ this.rmContext.getRMDelegationTokenSecretManager());
+ }
+ return super.createClientRMService();
+ }
+ @Override
+ protected ResourceTrackerService createResourceTrackerService() {
+ if (overrideRTS) {
+ return new CustomedResourceTrackerService(this.rmContext,
+ this.nodesListManager, this.nmLivelinessMonitor,
+ this.rmContext.getContainerTokenSecretManager(),
+ this.rmContext.getNMTokenSecretManager());
+ }
+ return super.createResourceTrackerService();
+ }
};
if (!useFixedPorts) {
if (HAUtil.isHAEnabled(conf)) {
@@ -676,7 +783,304 @@ public boolean waitForNodeManagersToConnect(long timeout)
}
return false;
}
-
+
+ private class CustomedClientRMService extends ClientRMService {
+ public CustomedClientRMService(RMContext rmContext,
+ YarnScheduler scheduler, RMAppManager rmAppManager,
+ ApplicationACLsManager applicationACLsManager,
+ QueueACLsManager queueACLsManager,
+ RMDelegationTokenSecretManager rmDTSecretManager) {
+ super(rmContext, scheduler, rmAppManager, applicationACLsManager,
+ queueACLsManager, rmDTSecretManager);
+ }
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // create the GetNewApplicationResponse with fake applicationId
+ GetNewApplicationResponse response =
+ GetNewApplicationResponse.newInstance(createFakeAppId(), null, null);
+ return response;
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // create a fake application report
+ ApplicationReport report = createFakeAppReport();
+ GetApplicationReportResponse response =
+ GetApplicationReportResponse.newInstance(report);
+ return response;
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // create GetClusterMetricsResponse with fake YarnClusterMetrics
+ GetClusterMetricsResponse response =
+ GetClusterMetricsResponse.newInstance(
+ createFakeYarnClusterMetrics());
+ return response;
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(
+ GetApplicationsRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // create GetApplicationsResponse with fake applicationList
+ GetApplicationsResponse response =
+ GetApplicationsResponse.newInstance(createFakeAppReports());
+ return response;
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(
+ GetClusterNodesRequest request)
+ throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // create GetClusterNodesResponse with fake ClusterNodeLists
+ GetClusterNodesResponse response =
+ GetClusterNodesResponse.newInstance(createFakeNodeReports());
+ return response;
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // return fake QueueInfo
+ return GetQueueInfoResponse.newInstance(createFakeQueueInfo());
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+
+ // return fake queueUserAcls
+ return GetQueueUserAclsInfoResponse
+ .newInstance(createFakeQueueUserACLInfoList());
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request) throws YarnException,
+ IOException {
+ resetStartFailoverFlag(true);
+
+ // return fake ApplicationAttemptReport
+ return GetApplicationAttemptReportResponse
+ .newInstance(createFakeApplicationAttemptReport());
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException,
+ IOException {
+ resetStartFailoverFlag(true);
+
+ // return fake ApplicationAttemptReports
+ return GetApplicationAttemptsResponse
+ .newInstance(createFakeApplicationAttemptReports());
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ resetStartFailoverFlag(true);
+
+ // return fake containerReport
+ return GetContainerReportResponse
+ .newInstance(createFakeContainerReport());
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ resetStartFailoverFlag(true);
+
+ // return fake ContainerReports
+ return GetContainersResponse.newInstance(createFakeContainerReports());
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+ return super.submitApplication(request);
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+ return KillApplicationResponse.newInstance(true);
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+ return Records.newRecord(MoveApplicationAcrossQueuesResponse.class);
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+ return GetDelegationTokenResponse.newInstance(createFakeToken());
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+ return RenewDelegationTokenResponse
+ .newInstance(createNextExpirationTime());
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException {
+ resetStartFailoverFlag(true);
+ return CancelDelegationTokenResponse.newInstance();
+ }
+ }
+
+ public ApplicationReport createFakeAppReport() {
+ ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+ // create a fake application report
+ ApplicationReport report =
+ ApplicationReport.newInstance(appId, attemptId, "fakeUser",
+ "fakeQueue", "fakeApplicationName", "localhost", 0, null,
+ YarnApplicationState.FAILED, "fake an application report", "",
+ 1000l, 1200l, FinalApplicationStatus.FAILED, null, "", 50f,
+ "fakeApplicationType", null);
+ return report;
+ }
+
+ public List createFakeAppReports() {
+ List reports = new ArrayList();
+ reports.add(createFakeAppReport());
+ return reports;
+ }
+
+ public ApplicationId createFakeAppId() {
+ return ApplicationId.newInstance(1000l, 1);
+ }
+
+ public ApplicationAttemptId createFakeApplicationAttemptId() {
+ return ApplicationAttemptId.newInstance(createFakeAppId(), 0);
+ }
+
+ public ContainerId createFakeContainerId() {
+ return ContainerId.newInstance(createFakeApplicationAttemptId(), 0);
+ }
+
+ public YarnClusterMetrics createFakeYarnClusterMetrics() {
+ return YarnClusterMetrics.newInstance(1);
+ }
+
+ public List createFakeNodeReports() {
+ NodeId nodeId = NodeId.newInstance("localhost", 0);
+ NodeReport report =
+ NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost", "rack1",
+ null, null, 4, null, 1000l);
+ List reports = new ArrayList();
+ reports.add(report);
+ return reports;
+ }
+
+ public QueueInfo createFakeQueueInfo() {
+ return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
+ createFakeAppReports(), QueueState.RUNNING);
+ }
+
+ public List createFakeQueueUserACLInfoList() {
+ List queueACL = new ArrayList();
+ queueACL.add(QueueACL.SUBMIT_APPLICATIONS);
+ QueueUserACLInfo info = QueueUserACLInfo.newInstance("root", queueACL);
+ List infos = new ArrayList();
+ infos.add(info);
+ return infos;
+ }
+
+ public ApplicationAttemptReport createFakeApplicationAttemptReport() {
+ return ApplicationAttemptReport.newInstance(
+ createFakeApplicationAttemptId(), "localhost", 0, "", "",
+ YarnApplicationAttemptState.RUNNING, createFakeContainerId());
+ }
+
+ public List createFakeApplicationAttemptReports() {
+ List reports =
+ new ArrayList();
+ reports.add(createFakeApplicationAttemptReport());
+ return reports;
+ }
+
+ public ContainerReport createFakeContainerReport() {
+ return ContainerReport.newInstance(createFakeContainerId(), null,
+ NodeId.newInstance("localhost", 0), null, 1000l, 1200l, "", "", 0,
+ ContainerState.COMPLETE);
+ }
+
+ public List createFakeContainerReports() {
+ List reports =
+ new ArrayList();
+ reports.add(createFakeContainerReport());
+ return reports;
+ }
+
+ public Token createFakeToken() {
+ String identifier = "fake Token";
+ String password = "fake token passwd";
+ Token token = Token.newInstance(
+ identifier.getBytes(), " ", password.getBytes(), " ");
+ return token;
+ }
+
+ public long createNextExpirationTime() {
+ return "fake Token".getBytes().length;
+ }
+
+ private class CustomedResourceTrackerService extends ResourceTrackerService {
+ public CustomedResourceTrackerService(RMContext rmContext,
+ NodesListManager nodesListManager,
+ NMLivelinessMonitor nmLivelinessMonitor,
+ RMContainerTokenSecretManager containerTokenSecretManager,
+ NMTokenSecretManagerInRM nmTokenSecretManager) {
+ super(rmContext, nodesListManager, nmLivelinessMonitor,
+ containerTokenSecretManager, nmTokenSecretManager);
+ }
+
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnException,
+ IOException {
+ resetStartFailoverFlag(true);
+
+ return super.registerNodeManager(request);
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnException, IOException {
+ resetStartFailoverFlag(true);
+
+ return super.nodeHeartbeat(request);
+ }
+ }
+
private class ApplicationHistoryServerWrapper extends AbstractService {
public ApplicationHistoryServerWrapper() {
super(ApplicationHistoryServerWrapper.class.getName());
@@ -736,4 +1140,8 @@ protected synchronized void serviceStop() throws Exception {
public ApplicationHistoryServer getApplicationHistoryServer() {
return this.appHistoryServer;
}
+
+ public void resetStartFailoverFlag(boolean flag) {
+ startFailover.set(flag);
+ }
}