diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java index b92538a..f1e3f03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java @@ -291,7 +291,7 @@ public void testE2ETokenSwap() throws Exception { } } - private ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient, + protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient, ApplicationId appId, MiniYARNCluster cluster, final Configuration yarnConf) throws IOException, InterruptedException, YarnException { @@ -331,7 +331,7 @@ public ApplicationMasterProtocol run() throws Exception { }); } - private AllocateRequest createAllocateRequest(List listNode) { + protected AllocateRequest createAllocateRequest(List listNode) { // The test needs AMRMClient to create a real allocate request AMRMClientImpl amClient = new AMRMClientImpl(); @@ -361,7 +361,7 @@ private AllocateRequest createAllocateRequest(List listNode) { new ArrayList(), resourceBlacklistRequest); } - private ApplicationId createApp(YarnClient yarnClient, + protected ApplicationId createApp(YarnClient yarnClient, MiniYARNCluster yarnCluster) throws Exception { ApplicationSubmissionContext appContext = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java new file mode 100644 index 0000000..b4dcf66 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * Validates End2End Distributed Scheduling flow which includes the AM + * specifying OPPORTUNISTIC containers in its resource requests, + * the AMRMProxyService on the NM, the LocalScheduler RequestInterceptor on + * the NM and the DistributedSchedulingProtocol used by the framework to talk + * to the DistributedSchedulingService running on the RM. + */ +public class TestDistributedScheduling extends TestAMRMProxy { + + private static final Log LOG = + LogFactory.getLog(TestDistributedScheduling.class); + + /** + * Validates if Allocate Requests containing only OPPORTUNISTIC container + * requests are satisfied instantly. + * + * @throws Exception + */ + @Test(timeout = 60000) + public void testOpportunisticExecutionTypeRequestE2E() throws Exception { + MiniYARNCluster cluster = + new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1); + YarnClient rmClient = null; + ApplicationMasterProtocol client; + + try { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); + cluster.init(conf); + cluster.start(); + final Configuration yarnConf = cluster.getConfig(); + + // the client has to connect to AMRMProxy + + yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); + rmClient = YarnClient.createYarnClient(); + rmClient.init(yarnConf); + rmClient.start(); + + // Submit application + + ApplicationId appId = createApp(rmClient, cluster); + + client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); + + LOG.info("testDistributedSchedulingE2E - Register"); + + RegisterApplicationMasterResponse responseRegister = + client.registerApplicationMaster(RegisterApplicationMasterRequest + .newInstance(NetUtils.getHostname(), 1024, "")); + + Assert.assertNotNull(responseRegister); + Assert.assertNotNull(responseRegister.getQueue()); + Assert.assertNotNull(responseRegister.getApplicationACLs()); + Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey()); + Assert + .assertNotNull(responseRegister.getContainersFromPreviousAttempts()); + Assert.assertNotNull(responseRegister.getSchedulerResourceTypes()); + Assert.assertNotNull(responseRegister.getMaximumResourceCapability()); + + RMApp rmApp = + cluster.getResourceManager().getRMContext().getRMApps().get(appId); + Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); + + LOG.info("testDistributedSchedulingE2E - Allocate"); + + AllocateRequest request = + createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); + + // Replace 'ANY' requests with OPPORTUNISTIC aks and remove + // everything else + List newAskList = new ArrayList<>(); + for (ResourceRequest rr : request.getAskList()) { + if (ResourceRequest.ANY.equals(rr.getResourceName())) { + ResourceRequest newRR = ResourceRequest.newInstance(rr + .getPriority(), rr.getResourceName(), + rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), + rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC); + newAskList.add(newRR); + } + } + request.setAskList(newAskList); + + AllocateResponse allocResponse = client.allocate(request); + Assert.assertNotNull(allocResponse); + + // Ensure that all the requests are satisfied immediately + Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); + + // Verify that the allocated containers are OPPORTUNISTIC + for (Container allocatedContainer : allocResponse + .getAllocatedContainers()) { + ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils + .newContainerTokenIdentifier( + allocatedContainer.getContainerToken()); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + containerTokenIdentifier.getExecutionType()); + } + + LOG.info("testDistributedSchedulingE2E - Finish"); + + FinishApplicationMasterResponse responseFinish = + client.finishApplicationMaster(FinishApplicationMasterRequest + .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null)); + + Assert.assertNotNull(responseFinish); + + } finally { + if (rmClient != null) { + rmClient.stop(); + } + cluster.stop(); + } + } + + /** + * Validates if Allocate Requests containing both GUARANTEED and OPPORTUNISTIC + * container requests works as expected. + * + * @throws Exception + */ + @Test(timeout = 60000) + public void testMixedExecutionTypeRequestE2E() throws Exception { + MiniYARNCluster cluster = + new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1); + YarnClient rmClient = null; + ApplicationMasterProtocol client; + + try { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); + cluster.init(conf); + cluster.start(); + final Configuration yarnConf = cluster.getConfig(); + + // the client has to connect to AMRMProxy + + yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); + rmClient = YarnClient.createYarnClient(); + rmClient.init(yarnConf); + rmClient.start(); + + // Submit application + + ApplicationId appId = createApp(rmClient, cluster); + + client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); + + LOG.info("testDistributedSchedulingE2E - Register"); + + RegisterApplicationMasterResponse responseRegister = + client.registerApplicationMaster(RegisterApplicationMasterRequest + .newInstance(NetUtils.getHostname(), 1024, "")); + + Assert.assertNotNull(responseRegister); + Assert.assertNotNull(responseRegister.getQueue()); + Assert.assertNotNull(responseRegister.getApplicationACLs()); + Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey()); + Assert + .assertNotNull(responseRegister.getContainersFromPreviousAttempts()); + Assert.assertNotNull(responseRegister.getSchedulerResourceTypes()); + Assert.assertNotNull(responseRegister.getMaximumResourceCapability()); + + RMApp rmApp = + cluster.getResourceManager().getRMContext().getRMApps().get(appId); + Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); + + LOG.info("testDistributedSchedulingE2E - Allocate"); + + AllocateRequest request = + createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); + List askList = request.getAskList(); + List newAskList = new ArrayList<>(askList); + + // Duplicate all ANY requests marking them as opportunistic + for (ResourceRequest rr : askList) { + if (ResourceRequest.ANY.equals(rr.getResourceName())) { + ResourceRequest newRR = ResourceRequest.newInstance(rr + .getPriority(), rr.getResourceName(), + rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), + rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC); + newAskList.add(newRR); + } + } + request.setAskList(newAskList); + + AllocateResponse allocResponse = client.allocate(request); + Assert.assertNotNull(allocResponse); + + // Ensure that all the requests are satisfied immediately + Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); + + // Verify that the allocated containers are OPPORTUNISTIC + for (Container allocatedContainer : allocResponse + .getAllocatedContainers()) { + ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils + .newContainerTokenIdentifier( + allocatedContainer.getContainerToken()); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + containerTokenIdentifier.getExecutionType()); + } + + request.setAskList(new ArrayList()); + request.setResponseId(request.getResponseId() + 1); + + Thread.sleep(1000); + + // RM should allocate GUARANTEED containers within 2 calls to allocate() + allocResponse = client.allocate(request); + Assert.assertNotNull(allocResponse); + Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); + + // Verify that the allocated containers are GUARANTEED + for (Container allocatedContainer : allocResponse + .getAllocatedContainers()) { + ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils + .newContainerTokenIdentifier( + allocatedContainer.getContainerToken()); + Assert.assertEquals(ExecutionType.GUARANTEED, + containerTokenIdentifier.getExecutionType()); + } + + LOG.info("testDistributedSchedulingE2E - Finish"); + + FinishApplicationMasterResponse responseFinish = + client.finishApplicationMaster(FinishApplicationMasterRequest + .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null)); + + Assert.assertNotNull(responseFinish); + + } finally { + if (rmClient != null) { + rmClient.stop(); + } + cluster.stop(); + } + } + + @Ignore + @Override + public void testAMRMProxyE2E() throws Exception { } + + @Ignore + @Override + public void testE2ETokenRenewal() throws Exception { } + + @Ignore + @Override + public void testE2ETokenSwap() throws Exception { } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java index 35914c6..49bf4d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java @@ -23,9 +23,18 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; -import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .AllocateResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .FinishApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .FinishApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .RegisterApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .RegisterApplicationMasterResponsePBImpl; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -43,6 +52,10 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb + .DistSchedAllocateResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb + .DistSchedRegisterResponsePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt .AMLivelinessMonitor; @@ -139,18 +152,30 @@ public AllocateResponse allocate(AllocateRequest request) throws // ApplicationMasterProtocol clients RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, ProtobufRpcEngine.class); - ApplicationMasterProtocol ampProxy = - (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol - .class, NetUtils.getConnectAddress(server), conf); - RegisterApplicationMasterResponse regResp = ampProxy.registerApplicationMaster( - factory.newRecordInstance(RegisterApplicationMasterRequest.class)); + ApplicationMasterProtocolPB ampProxy = + RPC.getProxy(ApplicationMasterProtocolPB + .class, 1, NetUtils.getConnectAddress(server), conf); + RegisterApplicationMasterResponse regResp = + new RegisterApplicationMasterResponsePBImpl( + ampProxy.registerApplicationMaster(null, + ((RegisterApplicationMasterRequestPBImpl)factory + .newRecordInstance( + RegisterApplicationMasterRequest.class)).getProto())); Assert.assertEquals("dummyQueue", regResp.getQueue()); - FinishApplicationMasterResponse finishResp = ampProxy - .finishApplicationMaster(factory.newRecordInstance( - FinishApplicationMasterRequest.class)); + FinishApplicationMasterResponse finishResp = + new FinishApplicationMasterResponsePBImpl( + ampProxy.finishApplicationMaster(null, + ((FinishApplicationMasterRequestPBImpl)factory + .newRecordInstance( + FinishApplicationMasterRequest.class)).getProto() + )); Assert.assertEquals(false, finishResp.getIsUnregistered()); - AllocateResponse allocResp = ampProxy - .allocate(factory.newRecordInstance(AllocateRequest.class)); + AllocateResponse allocResp = + new AllocateResponsePBImpl( + ampProxy.allocate(null, + ((AllocateRequestPBImpl)factory + .newRecordInstance(AllocateRequest.class)).getProto()) + ); Assert.assertEquals(12345, allocResp.getNumClusterNodes()); @@ -158,17 +183,22 @@ public AllocateResponse allocate(AllocateRequest request) throws // DistributedSchedulerProtocol clients as well RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class, ProtobufRpcEngine.class); - DistributedSchedulerProtocol dsProxy = - (DistributedSchedulerProtocol) rpc.getProxy(DistributedSchedulerProtocol - .class, NetUtils.getConnectAddress(server), conf); + DistributedSchedulerProtocolPB dsProxy = + RPC.getProxy(DistributedSchedulerProtocolPB + .class, 1, NetUtils.getConnectAddress(server), conf); DistSchedRegisterResponse dsRegResp = - dsProxy.registerApplicationMasterForDistributedScheduling( - factory.newRecordInstance(RegisterApplicationMasterRequest.class)); + new DistSchedRegisterResponsePBImpl( + dsProxy.registerApplicationMasterForDistributedScheduling(null, + ((RegisterApplicationMasterRequestPBImpl)factory + .newRecordInstance(RegisterApplicationMasterRequest.class)) + .getProto())); Assert.assertEquals(54321l, dsRegResp.getContainerIdStart()); DistSchedAllocateResponse dsAllocResp = - dsProxy.allocateForDistributedScheduling( - factory.newRecordInstance(AllocateRequest.class)); + new DistSchedAllocateResponsePBImpl( + dsProxy.allocateForDistributedScheduling(null, + ((AllocateRequestPBImpl)factory + .newRecordInstance(AllocateRequest.class)).getProto())); Assert.assertEquals( "h1", dsAllocResp.getNodesForScheduling().get(0).getHost()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 2704129..2372ea2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -861,6 +861,11 @@ protected void initializePipeline(ApplicationAttemptId applicationAttemptId, localToken); RequestInterceptor rt = getPipelines() .get(applicationAttemptId.getApplicationId()).getRootInterceptor(); + // The DefaultRequestInterceptor will generally be the last + // interceptor + while (rt.getNextInterceptor() != null) { + rt = rt.getNextInterceptor(); + } if (rt instanceof DefaultRequestInterceptor) { ((DefaultRequestInterceptor) rt) .setRMClient(getResourceManager().getApplicationMasterService());