diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5193dbe..9f44a6c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2611,6 +2611,27 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1; + public static final String ROUTER_PREFIX = YARN_PREFIX + "router."; + + public static final String ROUTER_CLIENTRM_PREFIX = + ROUTER_PREFIX + "clientrm."; + + public static final String ROUTER_CLIENTRM_ADDRESS = + ROUTER_CLIENTRM_PREFIX + ".address"; + public static final int DEFAULT_ROUTER_CLIENTRM_PORT = 8050; + public static final String DEFAULT_ROUTER_CLIENTRM_ADDRESS = + "0.0.0.0:" + DEFAULT_ROUTER_CLIENTRM_PORT; + + public static final String ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE = + ROUTER_CLIENTRM_PREFIX + "interceptor-class.pipeline"; + public static final String DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE = + "org.apache.hadoop.yarn.server.router.clientrm." + + "DefaultClientRequestInterceptor"; + + public static final String ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = + ROUTER_CLIENTRM_PREFIX + "cache-max-size"; + public static final int DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = 25; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java new file mode 100644 index 0000000..95ddbea --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java @@ -0,0 +1,49 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.util; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * LRU cache with a configurable maximum cache size and access order. + */ +public class LRUCacheHashMap extends LinkedHashMap { + + private static final long serialVersionUID = 1L; + + // Maximum size of the cache + private int maxSize; + + /** + * Constructor + * + * @param maxSize max size of the cache + * @param accessOrder true for access-order, false for insertion-order + */ + public LRUCacheHashMap(int maxSize, boolean accessOrder) { + super(maxSize, 0.75f, accessOrder); + this.maxSize = maxSize; + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxSize; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e1b3700..8f62440 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3127,4 +3127,22 @@ false + + + The comma separated list of class names that implement the + RequestInterceptor interface. This is used by the RouterClientRMService + to create the request processing pipeline for users. + + yarn.router.clientrm.interceptor-class.pipeline + org.apache.hadoop.yarn.server.router.clientrm.DefaultClientRequestInterceptor + + + + + Size of LRU cache for Router ClientRM Service. + + yarn.router.clientrm.cache-max-size + 25 + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java new file mode 100644 index 0000000..1cbb56c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java @@ -0,0 +1,74 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.util; + +import java.io.IOException; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test class to validate the correctness of the LRUCacheHashMap. + * + */ +public class TestLRUCacheHashMap { + + /** + * Test if the different entries are generated, and LRU cache is working as + * expected. + */ + @Test + public void testLRUCache() + throws YarnException, IOException, InterruptedException { + + int mapSize = 5; + + LRUCacheHashMap map = + new LRUCacheHashMap(mapSize, true); + + map.put("1", 1); + map.put("2", 2); + map.put("3", 3); + map.put("4", 4); + map.put("5", 5); + + Assert.assertEquals(mapSize, map.size()); + + // Check if all the elements in the map are from 1 to 5 + for (int i = 1; i < mapSize; i++) { + Assert.assertTrue(map.containsKey(Integer.toString(i))); + } + + map.put("6", 6); + map.put("3", 3); + map.put("7", 7); + map.put("8", 8); + + Assert.assertEquals(mapSize, map.size()); + + // Check if all the elements in the map are from 5 to 8 and the 3 + for (int i = 5; i < mapSize; i++) { + Assert.assertTrue(map.containsKey(Integer.toString(i))); + } + + Assert.assertTrue(map.containsKey("3")); + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 3bf1b88..ae284ce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -215,6 +215,17 @@ + + maven-jar-plugin + + + + test-jar + + test-compile + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java new file mode 100644 index 0000000..719387e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -0,0 +1,508 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; +import org.apache.hadoop.yarn.api.records.AMCommand; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.ReservationAllocationState; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.Records; +import org.eclipse.jetty.util.log.Log; +import org.junit.Assert; + +import com.google.common.base.Strings; + +/** + * Mock Resource Manager facade implementation that exposes all the methods + * implemented by the YARN RM. The behavior and the values returned by this mock + * implementation is expected by the unit test cases. So please change the + * implementation with care. + */ +public class MockResourceManagerFacade + implements ApplicationClientProtocol, ApplicationMasterProtocol { + + private HashMap> applicationContainerIdMap = + new HashMap>(); + private HashMap allocatedContainerMap = + new HashMap(); + private AtomicInteger containerIndex = new AtomicInteger(0); + private Configuration conf; + + public MockResourceManagerFacade(Configuration conf, + int startContainerIndex) { + this.conf = conf; + this.containerIndex.set(startContainerIndex); + } + + private static String getAppIdentifier() throws IOException { + AMRMTokenIdentifier result = null; + UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); + Set tokenIds = remoteUgi.getTokenIdentifiers(); + for (TokenIdentifier tokenId : tokenIds) { + if (tokenId instanceof AMRMTokenIdentifier) { + result = (AMRMTokenIdentifier) tokenId; + break; + } + } + return result != null ? result.getApplicationAttemptId().toString() : ""; + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + String amrmToken = getAppIdentifier(); + Log.getLog().info("Registering application attempt: " + amrmToken); + + synchronized (applicationContainerIdMap) { + Assert.assertFalse( + "The application id is already registered: " + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + // Keep track of the containers that are returned to this application + applicationContainerIdMap.put(amrmToken, new ArrayList()); + } + + return RegisterApplicationMasterResponse.newInstance(null, null, null, null, + null, request.getHost(), null); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnException, IOException { + String amrmToken = getAppIdentifier(); + Log.getLog().info("Finishing application attempt: " + amrmToken); + + synchronized (applicationContainerIdMap) { + // Remove the containers that were being tracked for this application + Assert.assertTrue("The application id is NOT registered: " + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + List ids = applicationContainerIdMap.remove(amrmToken); + for (ContainerId c : ids) { + allocatedContainerMap.remove(c); + } + } + + return FinishApplicationMasterResponse.newInstance( + request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED + ? true : false); + } + + protected ApplicationId getApplicationId(int id) { + return ApplicationId.newInstance(12345, id); + } + + protected ApplicationAttemptId getApplicationAttemptId(int id) { + return ApplicationAttemptId.newInstance(getApplicationId(id), 1); + } + + @SuppressWarnings("deprecation") + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + if (request.getAskList() != null && request.getAskList().size() > 0 + && request.getReleaseList() != null + && request.getReleaseList().size() > 0) { + Assert.fail("The mock RM implementation does not support receiving " + + "askList and releaseList in the same heartbeat"); + } + + String amrmToken = getAppIdentifier(); + + ArrayList containerList = new ArrayList(); + if (request.getAskList() != null) { + for (ResourceRequest rr : request.getAskList()) { + for (int i = 0; i < rr.getNumContainers(); i++) { + ContainerId containerId = ContainerId.newInstance( + getApplicationAttemptId(1), containerIndex.incrementAndGet()); + Container container = Records.newRecord(Container.class); + container.setId(containerId); + container.setPriority(rr.getPriority()); + + // We don't use the node for running containers in the test cases. So + // it is OK to hard code it to some dummy value + NodeId nodeId = + NodeId.newInstance(!Strings.isNullOrEmpty(rr.getResourceName()) + ? rr.getResourceName() : "dummy", 1000); + container.setNodeId(nodeId); + container.setResource(rr.getCapability()); + containerList.add(container); + + synchronized (applicationContainerIdMap) { + // Keep track of the containers returned to this application. We + // will need it in future + Assert.assertTrue( + "The application id is Not registered before allocate(): " + + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + List ids = applicationContainerIdMap.get(amrmToken); + ids.add(containerId); + this.allocatedContainerMap.put(containerId, container); + } + } + } + } + + if (request.getReleaseList() != null + && request.getReleaseList().size() > 0) { + Log.getLog() + .info("Releasing containers: " + request.getReleaseList().size()); + synchronized (applicationContainerIdMap) { + Assert + .assertTrue( + "The application id is not registered before allocate(): " + + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + List ids = applicationContainerIdMap.get(amrmToken); + + for (ContainerId id : request.getReleaseList()) { + boolean found = false; + for (ContainerId c : ids) { + if (c.equals(id)) { + found = true; + break; + } + } + + Assert.assertTrue("ContainerId " + id + + " being released is not valid for application: " + + conf.get("AMRMTOKEN"), found); + + ids.remove(id); + + // Return the released container back to the AM with new fake Ids. The + // test case does not care about the IDs. The IDs are faked because + // otherwise the LRM will throw duplication identifier exception. This + // returning of fake containers is ONLY done for testing purpose - for + // the test code to get confirmation that the sub-cluster resource + // managers received the release request + ContainerId fakeContainerId = ContainerId.newInstance( + getApplicationAttemptId(1), containerIndex.incrementAndGet()); + Container fakeContainer = allocatedContainerMap.get(id); + fakeContainer.setId(fakeContainerId); + containerList.add(fakeContainer); + } + } + } + + Log.getLog().info("Allocating containers: " + containerList.size() + + " for application attempt: " + conf.get("AMRMTOKEN")); + + // Always issue a new AMRMToken as if RM rolled master key + Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); + + return AllocateResponse.newInstance(0, new ArrayList(), + containerList, new ArrayList(), null, AMCommand.AM_RESYNC, + 1, null, new ArrayList(), newAMRMToken, + new ArrayList()); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + + GetApplicationReportResponse response = + Records.newRecord(GetApplicationReportResponse.class); + ApplicationReport report = Records.newRecord(ApplicationReport.class); + report.setYarnApplicationState(YarnApplicationState.ACCEPTED); + report.setApplicationId(request.getApplicationId()); + report.setCurrentApplicationAttemptId( + ApplicationAttemptId.newInstance(request.getApplicationId(), 1)); + response.setApplicationReport(report); + return response; + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + + GetApplicationAttemptReportResponse response = + Records.newRecord(GetApplicationAttemptReportResponse.class); + ApplicationAttemptReport report = + Records.newRecord(ApplicationAttemptReport.class); + report.setApplicationAttemptId(request.getApplicationAttemptId()); + report.setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED); + response.setApplicationAttemptReport(report); + return response; + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return GetNewApplicationResponse.newInstance(null, null, null); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return SubmitApplicationResponse.newInstance(); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + return KillApplicationResponse.newInstance(true); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return GetClusterMetricsResponse.newInstance(null); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return GetApplicationsResponse.newInstance(null); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return GetClusterNodesResponse.newInstance(null); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return GetQueueInfoResponse.newInstance(null); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return GetQueueUserAclsInfoResponse.newInstance(null); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return GetDelegationTokenResponse.newInstance(null); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + return RenewDelegationTokenResponse.newInstance(0); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + return CancelDelegationTokenResponse.newInstance(); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return MoveApplicationAcrossQueuesResponse.newInstance(); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return GetApplicationAttemptsResponse.newInstance(null); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + return GetContainerReportResponse.newInstance(null); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + return GetContainersResponse.newInstance(null); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return ReservationSubmissionResponse.newInstance(); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return ReservationListResponse + .newInstance(new ArrayList()); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return ReservationUpdateResponse.newInstance(); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return ReservationDeleteResponse.newInstance(); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return GetNodesToLabelsResponse + .newInstance(new HashMap>()); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return GetClusterNodeLabelsResponse.newInstance(new ArrayList()); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return GetLabelsToNodesResponse.newInstance(null); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return GetNewReservationResponse + .newInstance(ReservationId.newInstance(0, 0)); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return FailApplicationAttemptResponse.newInstance(); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return UpdateApplicationPriorityResponse.newInstance(null); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return new SignalContainerResponsePBImpl(); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return UpdateApplicationTimeoutsResponse.newInstance(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml index 1445d2a..29fdf78 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml @@ -168,6 +168,14 @@ org.fusesource.leveldbjni leveldbjni-all + + + org.apache.hadoop + hadoop-yarn-server-common + 3.0.0-alpha3-SNAPSHOT + test-jar + test + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java index c962f97..1cbb237 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; public class MockRequestInterceptor extends AbstractRequestInterceptor { @@ -38,22 +39,21 @@ public MockRequestInterceptor() { public void init(AMRMProxyApplicationContext appContext) { super.init(appContext); - mockRM = - new MockResourceManagerFacade(new YarnConfiguration( - super.getConf()), 0); + mockRM = new MockResourceManagerFacade( + new YarnConfiguration(super.getConf()), 0); } @Override public RegisterApplicationMasterResponse registerApplicationMaster( - RegisterApplicationMasterRequest request) throws YarnException, - IOException { + RegisterApplicationMasterRequest request) + throws YarnException, IOException { return mockRM.registerApplicationMaster(request); } @Override public FinishApplicationMasterResponse finishApplicationMaster( - FinishApplicationMasterRequest request) throws YarnException, - IOException { + FinishApplicationMasterRequest request) + throws YarnException, IOException { return mockRM.finishApplicationMaster(request); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java deleted file mode 100644 index f584c94..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ /dev/null @@ -1,514 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.base.Strings; -import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; -import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; -import org.apache.hadoop.yarn.api.ApplicationClientProtocol; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.records.AMCommand; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NMToken; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.api.records.UpdatedContainer; -import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.util.Records; -import org.junit.Assert; -import org.eclipse.jetty.util.log.Log; - -/** - * Mock Resource Manager facade implementation that exposes all the methods - * implemented by the YARN RM. The behavior and the values returned by this mock - * implementation is expected by the unit test cases. So please change the - * implementation with care. - */ -public class MockResourceManagerFacade implements - ApplicationMasterProtocol, ApplicationClientProtocol { - - private HashMap> applicationContainerIdMap = - new HashMap>(); - private HashMap allocatedContainerMap = - new HashMap(); - private AtomicInteger containerIndex = new AtomicInteger(0); - private Configuration conf; - - public MockResourceManagerFacade(Configuration conf, - int startContainerIndex) { - this.conf = conf; - this.containerIndex.set(startContainerIndex); - } - - private static String getAppIdentifier() throws IOException { - AMRMTokenIdentifier result = null; - UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); - Set tokenIds = remoteUgi.getTokenIdentifiers(); - for (TokenIdentifier tokenId : tokenIds) { - if (tokenId instanceof AMRMTokenIdentifier) { - result = (AMRMTokenIdentifier) tokenId; - break; - } - } - return result != null ? result.getApplicationAttemptId().toString() - : ""; - } - - @Override - public RegisterApplicationMasterResponse registerApplicationMaster( - RegisterApplicationMasterRequest request) throws YarnException, - IOException { - String amrmToken = getAppIdentifier(); - Log.getLog().info("Registering application attempt: " + amrmToken); - - synchronized (applicationContainerIdMap) { - Assert.assertFalse("The application id is already registered: " - + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); - // Keep track of the containers that are returned to this application - applicationContainerIdMap.put(amrmToken, - new ArrayList()); - } - - return RegisterApplicationMasterResponse.newInstance(null, null, null, - null, null, request.getHost(), null); - } - - @Override - public FinishApplicationMasterResponse finishApplicationMaster( - FinishApplicationMasterRequest request) throws YarnException, - IOException { - String amrmToken = getAppIdentifier(); - Log.getLog().info("Finishing application attempt: " + amrmToken); - - synchronized (applicationContainerIdMap) { - // Remove the containers that were being tracked for this application - Assert.assertTrue("The application id is NOT registered: " - + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.remove(amrmToken); - for (ContainerId c : ids) { - allocatedContainerMap.remove(c); - } - } - - return FinishApplicationMasterResponse - .newInstance(request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED ? true - : false); - } - - protected ApplicationId getApplicationId(int id) { - return ApplicationId.newInstance(12345, id); - } - - protected ApplicationAttemptId getApplicationAttemptId(int id) { - return ApplicationAttemptId.newInstance(getApplicationId(id), 1); - } - - @SuppressWarnings("deprecation") - @Override - public AllocateResponse allocate(AllocateRequest request) - throws YarnException, IOException { - if (request.getAskList() != null && request.getAskList().size() > 0 - && request.getReleaseList() != null - && request.getReleaseList().size() > 0) { - Assert.fail("The mock RM implementation does not support receiving " - + "askList and releaseList in the same heartbeat"); - } - - String amrmToken = getAppIdentifier(); - - ArrayList containerList = new ArrayList(); - if (request.getAskList() != null) { - for (ResourceRequest rr : request.getAskList()) { - for (int i = 0; i < rr.getNumContainers(); i++) { - ContainerId containerId = - ContainerId.newInstance(getApplicationAttemptId(1), - containerIndex.incrementAndGet()); - Container container = Records.newRecord(Container.class); - container.setId(containerId); - container.setPriority(rr.getPriority()); - - // We don't use the node for running containers in the test cases. So - // it is OK to hard code it to some dummy value - NodeId nodeId = - NodeId.newInstance( - !Strings.isNullOrEmpty(rr.getResourceName()) ? rr - .getResourceName() : "dummy", 1000); - container.setNodeId(nodeId); - container.setResource(rr.getCapability()); - containerList.add(container); - - synchronized (applicationContainerIdMap) { - // Keep track of the containers returned to this application. We - // will need it in future - Assert.assertTrue( - "The application id is Not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = - applicationContainerIdMap.get(amrmToken); - ids.add(containerId); - this.allocatedContainerMap.put(containerId, container); - } - } - } - } - - if (request.getReleaseList() != null - && request.getReleaseList().size() > 0) { - Log.getLog().info("Releasing containers: " - + request.getReleaseList().size()); - synchronized (applicationContainerIdMap) { - Assert.assertTrue( - "The application id is not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.get(amrmToken); - - for (ContainerId id : request.getReleaseList()) { - boolean found = false; - for (ContainerId c : ids) { - if (c.equals(id)) { - found = true; - break; - } - } - - Assert.assertTrue( - "ContainerId " + id - + " being released is not valid for application: " - + conf.get("AMRMTOKEN"), found); - - ids.remove(id); - - // Return the released container back to the AM with new fake Ids. The - // test case does not care about the IDs. The IDs are faked because - // otherwise the LRM will throw duplication identifier exception. This - // returning of fake containers is ONLY done for testing purpose - for - // the test code to get confirmation that the sub-cluster resource - // managers received the release request - ContainerId fakeContainerId = - ContainerId.newInstance(getApplicationAttemptId(1), - containerIndex.incrementAndGet()); - Container fakeContainer = allocatedContainerMap.get(id); - fakeContainer.setId(fakeContainerId); - containerList.add(fakeContainer); - } - } - } - - Log.getLog().info("Allocating containers: " + containerList.size() - + " for application attempt: " + conf.get("AMRMTOKEN")); - - // Always issue a new AMRMToken as if RM rolled master key - Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); - - return AllocateResponse.newInstance(0, - new ArrayList(), containerList, - new ArrayList(), null, AMCommand.AM_RESYNC, 1, null, - new ArrayList(), newAMRMToken, - new ArrayList()); - } - - @Override - public GetApplicationReportResponse getApplicationReport( - GetApplicationReportRequest request) throws YarnException, - IOException { - - GetApplicationReportResponse response = - Records.newRecord(GetApplicationReportResponse.class); - ApplicationReport report = Records.newRecord(ApplicationReport.class); - report.setYarnApplicationState(YarnApplicationState.ACCEPTED); - report.setApplicationId(request.getApplicationId()); - report.setCurrentApplicationAttemptId(ApplicationAttemptId - .newInstance(request.getApplicationId(), 1)); - response.setApplicationReport(report); - return response; - } - - @Override - public GetApplicationAttemptReportResponse getApplicationAttemptReport( - GetApplicationAttemptReportRequest request) throws YarnException, - IOException { - GetApplicationAttemptReportResponse response = - Records.newRecord(GetApplicationAttemptReportResponse.class); - ApplicationAttemptReport report = - Records.newRecord(ApplicationAttemptReport.class); - report.setApplicationAttemptId(request.getApplicationAttemptId()); - report - .setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED); - response.setApplicationAttemptReport(report); - return response; - } - - @Override - public GetNewApplicationResponse getNewApplication( - GetNewApplicationRequest request) throws YarnException, IOException { - return null; - } - - @Override - public SubmitApplicationResponse submitApplication( - SubmitApplicationRequest request) throws YarnException, IOException { - return null; - } - - @Override - public KillApplicationResponse forceKillApplication( - KillApplicationRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetClusterMetricsResponse getClusterMetrics( - GetClusterMetricsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetApplicationsResponse getApplications( - GetApplicationsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetClusterNodesResponse getClusterNodes( - GetClusterNodesRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) - throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetQueueUserAclsInfoResponse getQueueUserAcls( - GetQueueUserAclsInfoRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public GetDelegationTokenResponse getDelegationToken( - GetDelegationTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public RenewDelegationTokenResponse renewDelegationToken( - RenewDelegationTokenRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public CancelDelegationTokenResponse cancelDelegationToken( - CancelDelegationTokenRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( - MoveApplicationAcrossQueuesRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public GetApplicationAttemptsResponse getApplicationAttempts( - GetApplicationAttemptsRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public GetContainerReportResponse getContainerReport( - GetContainerReportRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetContainersResponse getContainers(GetContainersRequest request) - throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetNewReservationResponse getNewReservation( - GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public ReservationSubmissionResponse submitReservation( - ReservationSubmissionRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public ReservationListResponse listReservations( - ReservationListRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public ReservationUpdateResponse updateReservation( - ReservationUpdateRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public ReservationDeleteResponse deleteReservation( - ReservationDeleteRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetNodesToLabelsResponse getNodeToLabels( - GetNodesToLabelsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetClusterNodeLabelsResponse getClusterNodeLabels( - GetClusterNodeLabelsRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public GetLabelsToNodesResponse getLabelsToNodes( - GetLabelsToNodesRequest request) throws YarnException, IOException { - return null; - } - - @Override - public UpdateApplicationPriorityResponse updateApplicationPriority( - UpdateApplicationPriorityRequest request) throws YarnException, - IOException { - return null; - } - - @Override - public SignalContainerResponse signalToContainer( - SignalContainerRequest request) throws IOException { -return null; -} - - @Override - public FailApplicationAttemptResponse failApplicationAttempt( - FailApplicationAttemptRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( - UpdateApplicationTimeoutsRequest request) - throws YarnException, IOException { - throw new NotImplementedException(); - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml index 25afa5c..c4342d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml @@ -52,6 +52,20 @@ org.apache.hadoop hadoop-yarn-server-common + + + org.apache.hadoop + hadoop-yarn-server-common + 3.0.0-alpha3-SNAPSHOT + test-jar + test + + + + junit + junit + test + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java index 7be8a59..f51eda6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java @@ -18,6 +18,20 @@ package org.apache.hadoop.yarn.server.router; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService; + /** * The router is a stateless YARN component which is the entry point to the * cluster. It can be deployed on multiple nodes behind a Virtual IP (VIP) with @@ -33,6 +47,88 @@ * This provides a placeholder for throttling mis-behaving clients (YARN-1546) * and masks the access to multiple RMs (YARN-3659). */ -public class Router{ +public class Router extends CompositeService { + + private static final Log LOG = LogFactory.getLog(Router.class); + private static CompositeServiceShutdownHook routerShutdownHook; + private Configuration conf; + private AtomicBoolean isStopping = new AtomicBoolean(false); + private RouterClientRMService clientRMProxyService; + + /** + * Priority of the Router shutdown hook. + */ + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + + public Router() { + super(Router.class.getName()); + } + + protected void doSecureLogin() throws IOException { + // TODO + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + clientRMProxyService = createClientRMProxyService(); + addService(clientRMProxyService); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + try { + doSecureLogin(); + } catch (IOException e) { + throw new YarnRuntimeException("Failed Router login", e); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (isStopping.getAndSet(true)) { + return; + } + super.serviceStop(); + } + + protected void shutDown() { + new Thread() { + @Override + public void run() { + Router.this.stop(); + } + }.start(); + } + + protected RouterClientRMService createClientRMProxyService() { + return new RouterClientRMService(); + } + + public static void main(String argv[]) { + Configuration conf = new YarnConfiguration(); + Thread + .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage(Router.class, argv, LOG); + Router router = new Router(); + try { + + // Remove the old hook if we are rebooting. + if (null != routerShutdownHook) { + ShutdownHookManager.get().removeShutdownHook(routerShutdownHook); + } + + routerShutdownHook = new CompositeServiceShutdownHook(router); + ShutdownHookManager.get().addShutdownHook(routerShutdownHook, + SHUTDOWN_HOOK_PRIORITY); + router.init(conf); + router.start(); + } catch (Throwable t) { + LOG.fatal("Error starting Router", t); + System.exit(-1); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java new file mode 100644 index 0000000..fc6a118 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import org.apache.hadoop.conf.Configuration; + +/** + * Implements the RequestInterceptor interface and provides common functionality + * which can can be used and/or extended by other concrete intercepter classes. + * + */ +public abstract class AbstractClientRequestInterceptor + implements ClientRequestInterceptor { + private Configuration conf; + private ClientRequestInterceptor nextInterceptor; + + /** + * Sets the {@code RequestInterceptor} in the chain. + */ + @Override + public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) { + this.nextInterceptor = nextInterceptor; + } + + /** + * Sets the {@link Configuration}. + */ + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + if (this.nextInterceptor != null) { + this.nextInterceptor.setConf(conf); + } + } + + /** + * Gets the {@link Configuration}. + */ + @Override + public Configuration getConf() { + return this.conf; + } + + /** + * Initializes the {@code ClientRequestInterceptor}. + */ + @Override + public void init(String user) { + if (this.nextInterceptor != null) { + this.nextInterceptor.init(user); + } + } + + /** + * Disposes the {@code ClientRequestInterceptor}. + */ + @Override + public void shutdown() { + if (this.nextInterceptor != null) { + this.nextInterceptor.shutdown(); + } + } + + /** + * Gets the next {@link ClientRequestInterceptor} in the chain. + */ + @Override + public ClientRequestInterceptor getNextInterceptor() { + return this.nextInterceptor; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java new file mode 100644 index 0000000..1991763 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; + +/** + * Defines the contract to be implemented by the request intercepter classes, + * that can be used to intercept and inspect messages sent from the client to + * the resource manager. + */ +public interface ClientRequestInterceptor + extends ApplicationClientProtocol, Configurable { + /** + * This method is called for initializing the intercepter. This is guaranteed + * to be called only once in the lifetime of this instance. + * + * @param user + */ + void init(String user); + + /** + * This method is called to release the resources held by the intercepter. + * This will be called when the application pipeline is being destroyed. The + * concrete implementations should dispose the resources and forward the + * request to the next intercepter, if any. + */ + void shutdown(); + + /** + * Sets the next intercepter in the pipeline. The concrete implementation of + * this interface should always pass the request to the nextInterceptor after + * inspecting the message. The last intercepter in the chain is responsible to + * send the messages to the resource manager service and so the last + * intercepter will not receive this method call. + * + * @param nextInterceptor + */ + void setNextInterceptor(ClientRequestInterceptor nextInterceptor); + + /** + * Returns the next intercepter in the chain. + * + * @return the next intercepter in the chain + */ + ClientRequestInterceptor getNextInterceptor(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java new file mode 100644 index 0000000..828b5a0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Extends the AbstractRequestInterceptorClient class and provides an + * implementation that simply forwards the client requests to the cluster + * resource manager. + * + */ +public final class DefaultClientRequestInterceptor + extends AbstractClientRequestInterceptor { + private static final Logger LOG = + LoggerFactory.getLogger(DefaultClientRequestInterceptor.class); + private ApplicationClientProtocol clientRMProxy; + private UserGroupInformation user = null; + + @Override + public void init(String userName) { + super.init(userName); + try { + // Do not create a proxy user if user name matches the user name on + // current UGI + if (userName.equalsIgnoreCase( + UserGroupInformation.getCurrentUser().getUserName())) { + user = UserGroupInformation.getCurrentUser(); + } else { + user = UserGroupInformation.createProxyUser(userName, + UserGroupInformation.getCurrentUser()); + } + + final Configuration conf = this.getConf(); + + clientRMProxy = + user.doAs(new PrivilegedExceptionAction() { + @Override + public ApplicationClientProtocol run() throws Exception { + return ClientRMProxy.createRMProxy(conf, + ApplicationClientProtocol.class); + } + }); + } catch (IOException e) { + String message = "Error while creating Router ClientRM Service for user:"; + if (user != null) { + message += ", user: " + user; + } + + LOG.info(message); + throw new YarnRuntimeException(message, e); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + + @Override + public void setNextInterceptor(ClientRequestInterceptor next) { + throw new YarnRuntimeException( + "setNextInterceptor is being called on DefaultRequestInterceptor," + + "which should be the last one in the chain " + + "Check if the interceptor pipeline configuration is correct"); + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return clientRMProxy.getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return clientRMProxy.submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + return clientRMProxy.forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return clientRMProxy.getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return clientRMProxy.getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return clientRMProxy.getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return clientRMProxy.getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return clientRMProxy.moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return clientRMProxy.getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return clientRMProxy.submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return clientRMProxy.listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return clientRMProxy.updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return clientRMProxy.deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return clientRMProxy.getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return clientRMProxy.getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return clientRMProxy.getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + return clientRMProxy.getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return clientRMProxy.getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + return clientRMProxy.getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return clientRMProxy.getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + return clientRMProxy.getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + return clientRMProxy.getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return clientRMProxy.getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + return clientRMProxy.renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + return clientRMProxy.cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return clientRMProxy.failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return clientRMProxy.updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return clientRMProxy.signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return clientRMProxy.updateApplicationTimeouts(request); + } + + @VisibleForTesting + public void setRMClient(ApplicationClientProtocol clientRM) { + this.clientRMProxy = clientRM; + + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java new file mode 100644 index 0000000..71f0b77 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java @@ -0,0 +1,546 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.LRUCacheHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * RouterClientRMService is a service that runs on each router that can be used + * to intercept and inspect ApplicationClientProtocol messages from client to + * the cluster resource manager. It listens ApplicationClientProtocol messages + * from the client and creates a request intercepting pipeline instance for each + * client. The pipeline is a chain of intercepter instances that can inspect and + * modify the request/response as needed. The main difference with + * AMRMProxyService is the protocol they implement. + */ +public class RouterClientRMService extends AbstractService + implements ApplicationClientProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterClientRMService.class); + + private Server server; + private InetSocketAddress listenerEndpoint; + + // For each user we store an interceptors' pipeline. + // For performance issue we use LRU cache to keep in memory the newest ones + // and remove the oldest used ones. + private Map userPipelineMap; + + public RouterClientRMService() { + super(RouterClientRMService.class.getName()); + } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Starting Router ClientRMService"); + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); + UserGroupInformation.setConfiguration(conf); + + this.listenerEndpoint = + conf.getSocketAddr(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT); + + int maxCacheSize = + conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE); + this.userPipelineMap = Collections.synchronizedMap( + new LRUCacheHashMap( + maxCacheSize, true)); + + Configuration serverConf = new Configuration(conf); + + int numWorkerThreads = + serverConf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT); + + this.server = rpc.getServer(ApplicationClientProtocol.class, this, + listenerEndpoint, serverConf, null, numWorkerThreads); + + this.server.start(); + LOG.info("Router ClientRMService listening on address: " + + this.server.getListenerAddress()); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping Router ClientRMService"); + if (this.server != null) { + this.server.stop(); + } + userPipelineMap.clear(); + super.serviceStop(); + } + + /** + * Returns the comma separated intercepter class names from the configuration. + * + * @param conf + * @return the intercepter class names as an instance of ArrayList + */ + private List getInterceptorClassNames(Configuration conf) { + String configuredInterceptorClassNames = conf.get( + YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE); + + List interceptorClassNames = new ArrayList(); + Collection tempList = + StringUtils.getStringCollection(configuredInterceptorClassNames); + for (String item : tempList) { + interceptorClassNames.add(item.trim()); + } + + return interceptorClassNames; + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().updateApplicationTimeouts(request); + } + + private RequestInterceptorChainWrapper getInterceptorChain() + throws IOException { + String user = UserGroupInformation.getCurrentUser().getUserName(); + if (!userPipelineMap.containsKey(user)) { + initializePipeline(user); + } + return userPipelineMap.get(user); + } + + /** + * Gets the Request intercepter chains for all the users. + * + * @return the request intercepter chains. + */ + @VisibleForTesting + protected Map getPipelines() { + return this.userPipelineMap; + } + + /** + * This method creates and returns reference of the first intercepter in the + * chain of request intercepter instances. + * + * @return the reference of the first intercepter in the chain + */ + @VisibleForTesting + protected ClientRequestInterceptor createRequestInterceptorChain() { + Configuration conf = getConfig(); + + List interceptorClassNames = getInterceptorClassNames(conf); + + ClientRequestInterceptor pipeline = null; + ClientRequestInterceptor current = null; + for (String interceptorClassName : interceptorClassNames) { + try { + Class interceptorClass = conf.getClassByName(interceptorClassName); + if (ClientRequestInterceptor.class.isAssignableFrom(interceptorClass)) { + ClientRequestInterceptor interceptorInstance = + (ClientRequestInterceptor) ReflectionUtils + .newInstance(interceptorClass, conf); + if (pipeline == null) { + pipeline = interceptorInstance; + current = interceptorInstance; + continue; + } else { + current.setNextInterceptor(interceptorInstance); + current = interceptorInstance; + } + } else { + throw new YarnRuntimeException( + "Class: " + interceptorClassName + " not instance of " + + ClientRequestInterceptor.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate ApplicationClientRequestInterceptor: " + + interceptorClassName, + e); + } + } + + if (pipeline == null) { + throw new YarnRuntimeException( + "RequestInterceptor pipeline is not configured in the system"); + } + return pipeline; + } + + /** + * Initializes the request intercepter pipeline for the specified application. + * + * @param user + */ + private void initializePipeline(String user) { + RequestInterceptorChainWrapper chainWrapper = null; + synchronized (this.userPipelineMap) { + if (this.userPipelineMap.containsKey(user)) { + LOG.info( + "Request to start an already existing user: {} was received, so ignoring.", + user); + return; + } + + chainWrapper = new RequestInterceptorChainWrapper(); + this.userPipelineMap.put(user, chainWrapper); + } + + // We register the pipeline instance in the map first and then initialize it + // later because chain initialization can be expensive and we would like to + // release the lock as soon as possible to prevent other applications from + // blocking when one application's chain is initializing + LOG.info( + "Initializing request processing pipeline for application for the user: {}", + user); + + try { + ClientRequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(user); + chainWrapper.init(interceptorChain); + } catch (Exception e) { + synchronized (this.userPipelineMap) { + this.userPipelineMap.remove(user); + } + throw e; + } + } + + /** + * Private structure for encapsulating RequestInterceptor and user instances. + * + */ + @Private + public static class RequestInterceptorChainWrapper { + private ClientRequestInterceptor rootInterceptor; + + /** + * Initializes the wrapper with the specified parameters. + * + * @param rootInterceptor + */ + public synchronized void init(ClientRequestInterceptor rootInterceptor) { + this.rootInterceptor = rootInterceptor; + } + + /** + * Gets the root request intercepter. + * + * @return the root request intercepter + */ + public synchronized ClientRequestInterceptor getRootInterceptor() { + return rootInterceptor; + } + + /** + * Shutdown the chain of interceptors when the object is destroyed + */ + @Override + protected void finalize() { + rootInterceptor.shutdown(); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java new file mode 100644 index 0000000..7d1dadd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Router ClientRM Proxy Service package. **/ +package org.apache.hadoop.yarn.server.router.clientrm; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java new file mode 100644 index 0000000..200811d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.UTCClock; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +/** + * Base class for all the RouterClientRMService test cases. It provides utility + * methods that can be used by the concrete test case classes + * + */ +public abstract class BaseRouterClientRMTest { + + /** + * The RouterClientRMService instance that will be used by all the test cases + */ + private MockRouterClientRMService clientrmService; + /** + * Thread pool used for asynchronous operations + */ + private static ExecutorService threadpool = Executors.newCachedThreadPool(); + private Configuration conf; + private AsyncDispatcher dispatcher; + + public final static int TEST_MAX_CACHE_SIZE = 10; + + protected MockRouterClientRMService getRouterClientRMService() { + Assert.assertNotNull(this.clientrmService); + return this.clientrmService; + } + + @Before + public void setUp() { + this.conf = new YarnConfiguration(); + String mockPassThroughInterceptorClass = + PassThroughClientRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain will call the mock resource manager. The others in the chain will + // simply forward it to the next one in the chain + this.conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + mockPassThroughInterceptorClass + "," + + MockClientRequestInterceptor.class.getName()); + + this.conf.setInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE, + TEST_MAX_CACHE_SIZE); + + this.dispatcher = new AsyncDispatcher(); + this.dispatcher.init(conf); + this.dispatcher.start(); + this.clientrmService = createAndStartRouterClientRMService(); + } + + @After + public void tearDown() { + if (clientrmService != null) { + clientrmService.stop(); + clientrmService = null; + } + if (this.dispatcher != null) { + this.dispatcher.stop(); + } + } + + protected ExecutorService getThreadPool() { + return threadpool; + } + + protected MockRouterClientRMService createAndStartRouterClientRMService() { + MockRouterClientRMService svc = new MockRouterClientRMService(); + svc.init(conf); + svc.start(); + return svc; + } + + protected static class MockRouterClientRMService + extends RouterClientRMService { + public MockRouterClientRMService() { + super(); + } + } + + protected GetNewApplicationResponse getNewApplication(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetNewApplicationResponse run() throws Exception { + GetNewApplicationRequest req = + GetNewApplicationRequest.newInstance(); + GetNewApplicationResponse response = + getRouterClientRMService().getNewApplication(req); + return response; + } + }); + } + + protected SubmitApplicationResponse submitApplication( + final ApplicationId appId, String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public SubmitApplicationResponse run() throws Exception { + ApplicationSubmissionContext context = + ApplicationSubmissionContext.newInstance(appId, "", "", null, + null, false, false, -1, null, null); + SubmitApplicationRequest req = + SubmitApplicationRequest.newInstance(context); + SubmitApplicationResponse response = + getRouterClientRMService().submitApplication(req); + return response; + } + }); + } + + protected KillApplicationResponse forceKillApplication( + final ApplicationId appId, String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public KillApplicationResponse run() throws Exception { + KillApplicationRequest req = + KillApplicationRequest.newInstance(appId); + KillApplicationResponse response = + getRouterClientRMService().forceKillApplication(req); + return response; + } + }); + } + + protected GetClusterMetricsResponse getClusterMetrics(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetClusterMetricsResponse run() throws Exception { + GetClusterMetricsRequest req = + GetClusterMetricsRequest.newInstance(); + GetClusterMetricsResponse response = + getRouterClientRMService().getClusterMetrics(req); + return response; + } + }); + } + + protected GetClusterNodesResponse getClusterNodes(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetClusterNodesResponse run() throws Exception { + GetClusterNodesRequest req = GetClusterNodesRequest.newInstance(); + GetClusterNodesResponse response = + getRouterClientRMService().getClusterNodes(req); + return response; + } + }); + } + + protected GetQueueInfoResponse getQueueInfo(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetQueueInfoResponse run() throws Exception { + GetQueueInfoRequest req = + GetQueueInfoRequest.newInstance("default", false, false, false); + GetQueueInfoResponse response = + getRouterClientRMService().getQueueInfo(req); + return response; + } + }); + } + + protected GetQueueUserAclsInfoResponse getQueueUserAcls(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetQueueUserAclsInfoResponse run() throws Exception { + GetQueueUserAclsInfoRequest req = + GetQueueUserAclsInfoRequest.newInstance(); + GetQueueUserAclsInfoResponse response = + getRouterClientRMService().getQueueUserAcls(req); + return response; + } + }); + } + + protected MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + String user, final ApplicationId appId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public MoveApplicationAcrossQueuesResponse run() throws Exception { + + MoveApplicationAcrossQueuesRequest req = + MoveApplicationAcrossQueuesRequest.newInstance(appId, + "newQueue"); + MoveApplicationAcrossQueuesResponse response = + getRouterClientRMService().moveApplicationAcrossQueues(req); + return response; + } + }); + } + + public GetNewReservationResponse getNewReservation(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetNewReservationResponse run() throws Exception { + GetNewReservationResponse response = getRouterClientRMService() + .getNewReservation(GetNewReservationRequest.newInstance()); + return response; + } + }); + } + + protected ReservationSubmissionResponse submitReservation(String user, + final ReservationId reservationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public ReservationSubmissionResponse run() throws Exception { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + + ReservationSubmissionRequest req = createSimpleReservationRequest(1, + arrival, deadline, duration, reservationId); + ReservationSubmissionResponse response = + getRouterClientRMService().submitReservation(req); + return response; + } + }); + } + + protected ReservationUpdateResponse updateReservation(String user, + final ReservationId reservationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public ReservationUpdateResponse run() throws Exception { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + ReservationDefinition rDef = + createSimpleReservationRequest(1, arrival, deadline, duration, + reservationId).getReservationDefinition(); + + ReservationUpdateRequest req = + ReservationUpdateRequest.newInstance(rDef, reservationId); + ReservationUpdateResponse response = + getRouterClientRMService().updateReservation(req); + return response; + } + }); + } + + protected ReservationDeleteResponse deleteReservation(String user, + final ReservationId reservationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public ReservationDeleteResponse run() throws Exception { + ReservationDeleteRequest req = + ReservationDeleteRequest.newInstance(reservationId); + ReservationDeleteResponse response = + getRouterClientRMService().deleteReservation(req); + return response; + } + }); + } + + protected GetNodesToLabelsResponse getNodeToLabels(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetNodesToLabelsResponse run() throws Exception { + GetNodesToLabelsRequest req = GetNodesToLabelsRequest.newInstance(); + GetNodesToLabelsResponse response = + getRouterClientRMService().getNodeToLabels(req); + return response; + } + }); + } + + protected GetLabelsToNodesResponse getLabelsToNodes(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetLabelsToNodesResponse run() throws Exception { + GetLabelsToNodesRequest req = GetLabelsToNodesRequest.newInstance(); + GetLabelsToNodesResponse response = + getRouterClientRMService().getLabelsToNodes(req); + return response; + } + }); + } + + protected GetClusterNodeLabelsResponse getClusterNodeLabels(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetClusterNodeLabelsResponse run() throws Exception { + GetClusterNodeLabelsRequest req = + GetClusterNodeLabelsRequest.newInstance(); + GetClusterNodeLabelsResponse response = + getRouterClientRMService().getClusterNodeLabels(req); + return response; + } + }); + } + + protected GetApplicationReportResponse getApplicationReport(String user, + final ApplicationId appId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetApplicationReportResponse run() throws Exception { + GetApplicationReportRequest req = + GetApplicationReportRequest.newInstance(appId); + GetApplicationReportResponse response = + getRouterClientRMService().getApplicationReport(req); + return response; + } + }); + } + + protected GetApplicationsResponse getApplications(String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetApplicationsResponse run() throws Exception { + GetApplicationsRequest req = GetApplicationsRequest.newInstance(); + GetApplicationsResponse response = + getRouterClientRMService().getApplications(req); + return response; + } + }); + } + + protected GetApplicationAttemptReportResponse getApplicationAttemptReport( + String user, final ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user).doAs( + new PrivilegedExceptionAction() { + @Override + public GetApplicationAttemptReportResponse run() throws Exception { + GetApplicationAttemptReportRequest req = + GetApplicationAttemptReportRequest.newInstance(appAttemptId); + GetApplicationAttemptReportResponse response = + getRouterClientRMService().getApplicationAttemptReport(req); + return response; + } + }); + } + + protected GetApplicationAttemptsResponse getApplicationAttempts(String user, + final ApplicationId applicationId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetApplicationAttemptsResponse run() throws Exception { + GetApplicationAttemptsRequest req = + GetApplicationAttemptsRequest.newInstance(applicationId); + GetApplicationAttemptsResponse response = + getRouterClientRMService().getApplicationAttempts(req); + return response; + } + }); + } + + protected GetContainerReportResponse getContainerReport(String user, + final ContainerId containerId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetContainerReportResponse run() throws Exception { + GetContainerReportRequest req = + GetContainerReportRequest.newInstance(containerId); + GetContainerReportResponse response = + getRouterClientRMService().getContainerReport(req); + return response; + } + }); + } + + protected GetContainersResponse getContainers(String user, + final ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetContainersResponse run() throws Exception { + GetContainersRequest req = + GetContainersRequest.newInstance(appAttemptId); + GetContainersResponse response = + getRouterClientRMService().getContainers(req); + return response; + } + }); + } + + protected GetDelegationTokenResponse getDelegationToken(final String user) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public GetDelegationTokenResponse run() throws Exception { + GetDelegationTokenRequest req = + GetDelegationTokenRequest.newInstance(user); + GetDelegationTokenResponse response = + getRouterClientRMService().getDelegationToken(req); + return response; + } + }); + } + + protected RenewDelegationTokenResponse renewDelegationToken(String user, + final Token token) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public RenewDelegationTokenResponse run() throws Exception { + RenewDelegationTokenRequest req = + RenewDelegationTokenRequest.newInstance(token); + RenewDelegationTokenResponse response = + getRouterClientRMService().renewDelegationToken(req); + return response; + } + }); + } + + protected CancelDelegationTokenResponse cancelDelegationToken(String user, + final Token token) + throws YarnException, IOException, InterruptedException { + return UserGroupInformation.createRemoteUser(user) + .doAs(new PrivilegedExceptionAction() { + @Override + public CancelDelegationTokenResponse run() throws Exception { + CancelDelegationTokenRequest req = + CancelDelegationTokenRequest.newInstance(token); + CancelDelegationTokenResponse response = + getRouterClientRMService().cancelDelegationToken(req); + return response; + } + }); + } + + private ReservationSubmissionRequest createSimpleReservationRequest( + int numContainers, long arrival, long deadline, long duration, + ReservationId reservationId) { + // create a request with a single atomic ask + ReservationRequest r = ReservationRequest + .newInstance(Resource.newInstance(1024, 1), numContainers, 1, duration); + ReservationRequests reqs = ReservationRequests.newInstance( + Collections.singletonList(r), ReservationRequestInterpreter.R_ALL); + ReservationDefinition rDef = ReservationDefinition.newInstance(arrival, + deadline, reqs, "testRouterClientRMService#reservation"); + ReservationSubmissionRequest request = ReservationSubmissionRequest + .newInstance(rDef, "dedicated", reservationId); + return request; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java new file mode 100644 index 0000000..b9f5a3f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; +import org.apache.hadoop.yarn.server.router.clientrm.AbstractClientRequestInterceptor; + +public class MockClientRequestInterceptor + extends AbstractClientRequestInterceptor { + + private MockResourceManagerFacade mockRM; + + public MockClientRequestInterceptor() { + } + + public void init(String user) { + super.init(user); + mockRM = new MockResourceManagerFacade( + new YarnConfiguration(super.getConf()), 0); + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return mockRM.getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return mockRM.submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + return mockRM.forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return mockRM.getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return mockRM.getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return mockRM.getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return mockRM.getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return mockRM.moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return mockRM.getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return mockRM.submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return mockRM.listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return mockRM.updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return mockRM.deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return mockRM.getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return mockRM.getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return mockRM.getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + return mockRM.getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return mockRM.getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + return mockRM.getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return mockRM.getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + return mockRM.getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + return mockRM.getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return mockRM.getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + return mockRM.renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + return mockRM.cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return mockRM.failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return mockRM.updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return mockRM.signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return mockRM.updateApplicationTimeouts(request); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java new file mode 100644 index 0000000..ffabd68 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.router.clientrm.AbstractClientRequestInterceptor; + +/** + * Mock intercepter that does not do anything other than forwarding it to the + * next intercepter in the chain + * + */ +public class PassThroughClientRequestInterceptor + extends AbstractClientRequestInterceptor { + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return getNextInterceptor().getNewApplication(request); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return getNextInterceptor().submitApplication(request); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + return getNextInterceptor().forceKillApplication(request); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return getNextInterceptor().getClusterMetrics(request); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return getNextInterceptor().getClusterNodes(request); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return getNextInterceptor().getQueueInfo(request); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return getNextInterceptor().getQueueUserAcls(request); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return getNextInterceptor().moveApplicationAcrossQueues(request); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return getNextInterceptor().getNewReservation(request); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return getNextInterceptor().submitReservation(request); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return getNextInterceptor().listReservations(request); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return getNextInterceptor().updateReservation(request); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return getNextInterceptor().deleteReservation(request); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return getNextInterceptor().getNodeToLabels(request); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return getNextInterceptor().getLabelsToNodes(request); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return getNextInterceptor().getClusterNodeLabels(request); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + return getNextInterceptor().getApplicationReport(request); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return getNextInterceptor().getApplications(request); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + return getNextInterceptor().getApplicationAttemptReport(request); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return getNextInterceptor().getApplicationAttempts(request); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + return getNextInterceptor().getContainerReport(request); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + return getNextInterceptor().getContainers(request); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return getNextInterceptor().getDelegationToken(request); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + return getNextInterceptor().renewDelegationToken(request); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + return getNextInterceptor().cancelDelegationToken(request); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return getNextInterceptor().failApplicationAttempt(request); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return getNextInterceptor().updateApplicationPriority(request); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return getNextInterceptor().signalToContainer(request); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return getNextInterceptor().updateApplicationTimeouts(request); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java new file mode 100644 index 0000000..818735f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterClientRMService.java @@ -0,0 +1,206 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.router.clientrm.ClientRequestInterceptor; +import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService.RequestInterceptorChainWrapper; +import org.junit.Assert; +import org.junit.Test; + +public class TestRouterClientRMService extends BaseRouterClientRMTest { + + private static final Log LOG = + LogFactory.getLog(TestRouterClientRMService.class); + + /** + * Test if the pipeline is created properly. + */ + @Test + public void testRequestInterceptorChainCreation() throws Exception { + ClientRequestInterceptor root = + super.getRouterClientRMService().createRequestInterceptorChain(); + int index = 0; + while (root != null) { + // The current pipeline is: + // PassThroughClientRequestInterceptor - index = 0 + // PassThroughClientRequestInterceptor - index = 1 + // PassThroughClientRequestInterceptor - index = 2 + // MockClientRequestInterceptor - index = 3 + switch (index) { + case 0: // Fall to the next case + case 1: // Fall to the next case + case 2: + // If index is equal to 0,1 or 2 we fall in this check + Assert.assertEquals(PassThroughClientRequestInterceptor.class.getName(), + root.getClass().getName()); + break; + case 3: + Assert.assertEquals(MockClientRequestInterceptor.class.getName(), + root.getClass().getName()); + break; + } + root = root.getNextInterceptor(); + index++; + } + Assert.assertEquals("The number of interceptors in chain does not match", 4, + index); + } + + /** + * Test if the RouterClientRM forwards all the requests to the MockRM and get + * back the responses. + */ + @Test + public void testRouterClientRMServiceE2E() throws Exception { + + String user = "test1"; + + LOG.info("testRouterClientRMServiceE2E - Get New Application"); + + GetNewApplicationResponse responseGetNewApp = getNewApplication(user); + Assert.assertNotNull(responseGetNewApp); + + LOG.info("testRouterClientRMServiceE2E - Submit Application"); + + SubmitApplicationResponse responseSubmitApp = + submitApplication(responseGetNewApp.getApplicationId(), user); + Assert.assertNotNull(responseSubmitApp); + + LOG.info("testRouterClientRMServiceE2E - Kill Application"); + + KillApplicationResponse responseKillApp = + forceKillApplication(responseGetNewApp.getApplicationId(), user); + Assert.assertNotNull(responseKillApp); + + LOG.info("testRouterClientRMServiceE2E - Get Cluster Metrics"); + + GetClusterMetricsResponse responseGetClusterMetrics = + getClusterMetrics(user); + Assert.assertNotNull(responseGetClusterMetrics); + + LOG.info("testRouterClientRMServiceE2E - Get Cluster Nodes"); + + GetClusterNodesResponse responseGetClusterNodes = getClusterNodes(user); + Assert.assertNotNull(responseGetClusterNodes); + + LOG.info("testRouterClientRMServiceE2E - Get Queue Info"); + + GetQueueInfoResponse responseGetQueueInfo = getQueueInfo(user); + Assert.assertNotNull(responseGetQueueInfo); + + LOG.info("testRouterClientRMServiceE2E - Get Queue User"); + + GetQueueUserAclsInfoResponse responseGetQueueUser = getQueueUserAcls(user); + Assert.assertNotNull(responseGetQueueUser); + + LOG.info("testRouterClientRMServiceE2E - Get Cluster Node"); + + GetClusterNodeLabelsResponse responseGetClusterNode = + getClusterNodeLabels(user); + Assert.assertNotNull(responseGetClusterNode); + + LOG.info("testRouterClientRMServiceE2E - Move Application Across Queues"); + + MoveApplicationAcrossQueuesResponse responseMoveApp = + moveApplicationAcrossQueues(user, responseGetNewApp.getApplicationId()); + Assert.assertNotNull(responseMoveApp); + + LOG.info("testRouterClientRMServiceE2E - Get New Reservation"); + + GetNewReservationResponse getNewReservationResponse = + getNewReservation(user); + + LOG.info("testRouterClientRMServiceE2E - Submit Reservation"); + + ReservationSubmissionResponse responseSubmitReser = + submitReservation(user, getNewReservationResponse.getReservationId()); + Assert.assertNotNull(responseSubmitReser); + + LOG.info("testRouterClientRMServiceE2E - Update Reservation"); + + ReservationUpdateResponse responseUpdateReser = + updateReservation(user, getNewReservationResponse.getReservationId()); + Assert.assertNotNull(responseUpdateReser); + + LOG.info("testRouterClientRMServiceE2E - Delete Reservation"); + + ReservationDeleteResponse responseDeleteReser = + deleteReservation(user, getNewReservationResponse.getReservationId()); + Assert.assertNotNull(responseDeleteReser); + } + + /** + * Test if the different chains for users are generated, and LRU cache is + * working as expected + */ + @Test + public void testUsersChainMapWithLRUCache() + throws YarnException, IOException, InterruptedException { + + Map pipelines; + RequestInterceptorChainWrapper chain; + + getNewApplication("test1"); + getNewApplication("test2"); + getNewApplication("test3"); + getNewApplication("test4"); + getNewApplication("test5"); + getNewApplication("test6"); + getNewApplication("test7"); + getNewApplication("test8"); + + pipelines = super.getRouterClientRMService().getPipelines(); + Assert.assertEquals(8, pipelines.size()); + + getNewApplication("test9"); + getNewApplication("test10"); + getNewApplication("test1"); + getNewApplication("test11"); + + // The cache max size is defined in + // BaseRouterClientRMTest.TEST_MAX_CACHE_SIZE + Assert.assertEquals(10, pipelines.size()); + + chain = pipelines.get("test1"); + Assert.assertNotNull("test1 should not be evicted", chain); + + chain = pipelines.get("test2"); + Assert.assertNull("test2 should have been evicted", chain); + } + +}