diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java index b92538a..f1e3f03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java @@ -291,7 +291,7 @@ public void testE2ETokenSwap() throws Exception { } } - private ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient, + protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient, ApplicationId appId, MiniYARNCluster cluster, final Configuration yarnConf) throws IOException, InterruptedException, YarnException { @@ -331,7 +331,7 @@ public ApplicationMasterProtocol run() throws Exception { }); } - private AllocateRequest createAllocateRequest(List listNode) { + protected AllocateRequest createAllocateRequest(List listNode) { // The test needs AMRMClient to create a real allocate request AMRMClientImpl amClient = new AMRMClientImpl(); @@ -361,7 +361,7 @@ private AllocateRequest createAllocateRequest(List listNode) { new ArrayList(), resourceBlacklistRequest); } - private ApplicationId createApp(YarnClient yarnClient, + protected ApplicationId createApp(YarnClient yarnClient, MiniYARNCluster yarnCluster) throws Exception { ApplicationSubmissionContext appContext = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java new file mode 100644 index 0000000..0b12e3e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords + .FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class TestDistributedScheduling extends TestAMRMProxy { + + private static final Log LOG = + LogFactory.getLog(TestDistributedScheduling.class); + + @Test(timeout = 600000) + public void testDistributedSchedulingE2E() throws Exception { + MiniYARNCluster cluster = + new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1); + YarnClient rmClient = null; + ApplicationMasterProtocol client; + + try { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); + cluster.init(conf); + cluster.start(); + final Configuration yarnConf = cluster.getConfig(); + + // the client has to connect to AMRMProxy + + yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); + rmClient = YarnClient.createYarnClient(); + rmClient.init(yarnConf); + rmClient.start(); + + // Submit application + + ApplicationId appId = createApp(rmClient, cluster); + + client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); + + LOG.info("testDistributedSchedulingE2E - Register"); + + RegisterApplicationMasterResponse responseRegister = + client.registerApplicationMaster(RegisterApplicationMasterRequest + .newInstance(NetUtils.getHostname(), 1024, "")); + + Assert.assertNotNull(responseRegister); + Assert.assertNotNull(responseRegister.getQueue()); + Assert.assertNotNull(responseRegister.getApplicationACLs()); + Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey()); + Assert + .assertNotNull(responseRegister.getContainersFromPreviousAttempts()); + Assert.assertNotNull(responseRegister.getSchedulerResourceTypes()); + Assert.assertNotNull(responseRegister.getMaximumResourceCapability()); + + RMApp rmApp = + cluster.getResourceManager().getRMContext().getRMApps().get(appId); + Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); + + LOG.info("testDistributedSchedulingE2E - Allocate"); + + AllocateRequest request = + createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); + List askList = request.getAskList(); + // Mark all ANY requests as opportunistic + for (ResourceRequest rr : askList) { + if (ResourceRequest.ANY.equals(rr.getResourceName())) { + rr.setExecutionType(ExecutionType.OPPORTUNISTIC); + } + } + + AllocateResponse allocResponse = client.allocate(request); + Assert.assertNotNull(allocResponse); + + // Ensure that all the requests are satisfied immediately + Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); + + LOG.info("testDistributedSchedulingE2E - Finish"); + + FinishApplicationMasterResponse responseFinish = + client.finishApplicationMaster(FinishApplicationMasterRequest + .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null)); + + Assert.assertNotNull(responseFinish); + + } finally { + if (rmClient != null) { + rmClient.stop(); + } + cluster.stop(); + } + } + + @Override + public void testAMRMProxyE2E() throws Exception { } + + @Override + public void testE2ETokenRenewal() throws Exception { } + + @Override + public void testE2ETokenSwap() throws Exception { } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java index 35914c6..49bf4d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java @@ -23,9 +23,18 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; -import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .AllocateResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .FinishApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .FinishApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .RegisterApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb + .RegisterApplicationMasterResponsePBImpl; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -43,6 +52,10 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb + .DistSchedAllocateResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb + .DistSchedRegisterResponsePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt .AMLivelinessMonitor; @@ -139,18 +152,30 @@ public AllocateResponse allocate(AllocateRequest request) throws // ApplicationMasterProtocol clients RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, ProtobufRpcEngine.class); - ApplicationMasterProtocol ampProxy = - (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol - .class, NetUtils.getConnectAddress(server), conf); - RegisterApplicationMasterResponse regResp = ampProxy.registerApplicationMaster( - factory.newRecordInstance(RegisterApplicationMasterRequest.class)); + ApplicationMasterProtocolPB ampProxy = + RPC.getProxy(ApplicationMasterProtocolPB + .class, 1, NetUtils.getConnectAddress(server), conf); + RegisterApplicationMasterResponse regResp = + new RegisterApplicationMasterResponsePBImpl( + ampProxy.registerApplicationMaster(null, + ((RegisterApplicationMasterRequestPBImpl)factory + .newRecordInstance( + RegisterApplicationMasterRequest.class)).getProto())); Assert.assertEquals("dummyQueue", regResp.getQueue()); - FinishApplicationMasterResponse finishResp = ampProxy - .finishApplicationMaster(factory.newRecordInstance( - FinishApplicationMasterRequest.class)); + FinishApplicationMasterResponse finishResp = + new FinishApplicationMasterResponsePBImpl( + ampProxy.finishApplicationMaster(null, + ((FinishApplicationMasterRequestPBImpl)factory + .newRecordInstance( + FinishApplicationMasterRequest.class)).getProto() + )); Assert.assertEquals(false, finishResp.getIsUnregistered()); - AllocateResponse allocResp = ampProxy - .allocate(factory.newRecordInstance(AllocateRequest.class)); + AllocateResponse allocResp = + new AllocateResponsePBImpl( + ampProxy.allocate(null, + ((AllocateRequestPBImpl)factory + .newRecordInstance(AllocateRequest.class)).getProto()) + ); Assert.assertEquals(12345, allocResp.getNumClusterNodes()); @@ -158,17 +183,22 @@ public AllocateResponse allocate(AllocateRequest request) throws // DistributedSchedulerProtocol clients as well RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class, ProtobufRpcEngine.class); - DistributedSchedulerProtocol dsProxy = - (DistributedSchedulerProtocol) rpc.getProxy(DistributedSchedulerProtocol - .class, NetUtils.getConnectAddress(server), conf); + DistributedSchedulerProtocolPB dsProxy = + RPC.getProxy(DistributedSchedulerProtocolPB + .class, 1, NetUtils.getConnectAddress(server), conf); DistSchedRegisterResponse dsRegResp = - dsProxy.registerApplicationMasterForDistributedScheduling( - factory.newRecordInstance(RegisterApplicationMasterRequest.class)); + new DistSchedRegisterResponsePBImpl( + dsProxy.registerApplicationMasterForDistributedScheduling(null, + ((RegisterApplicationMasterRequestPBImpl)factory + .newRecordInstance(RegisterApplicationMasterRequest.class)) + .getProto())); Assert.assertEquals(54321l, dsRegResp.getContainerIdStart()); DistSchedAllocateResponse dsAllocResp = - dsProxy.allocateForDistributedScheduling( - factory.newRecordInstance(AllocateRequest.class)); + new DistSchedAllocateResponsePBImpl( + dsProxy.allocateForDistributedScheduling(null, + ((AllocateRequestPBImpl)factory + .newRecordInstance(AllocateRequest.class)).getProto())); Assert.assertEquals( "h1", dsAllocResp.getNodesForScheduling().get(0).getHost()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 2704129..2844082 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -861,6 +861,9 @@ protected void initializePipeline(ApplicationAttemptId applicationAttemptId, localToken); RequestInterceptor rt = getPipelines() .get(applicationAttemptId.getApplicationId()).getRootInterceptor(); + while (rt.getNextInterceptor() != null) { + rt = rt.getNextInterceptor(); + } if (rt instanceof DefaultRequestInterceptor) { ((DefaultRequestInterceptor) rt) .setRMClient(getResourceManager().getApplicationMasterService());