diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java index 9afc5cd..4678082 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java @@ -41,9 +41,7 @@ YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.RM_ADMIN_ADDRESS, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, - YarnConfiguration.RM_WEBAPP_ADDRESS, - // TODO Remove after YARN-1318 - YarnConfiguration.RM_HA_ADMIN_ADDRESS)); + YarnConfiguration.RM_WEBAPP_ADDRESS)); public static final String BAD_CONFIG_MESSAGE_PREFIX = "Invalid configuration! "; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 09f6b6e..fbc1489 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -283,18 +283,6 @@ public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; public static final String RM_HA_ID = RM_HA_PREFIX + "id"; - @org.apache.hadoop.classification.InterfaceAudience.Private - // TODO Remove after YARN-1318 - public static final String RM_HA_ADMIN_ADDRESS = - RM_HA_PREFIX + "admin.address"; - public static final int DEFAULT_RM_HA_ADMIN_PORT = 8034; - public static String DEFAULT_RM_HA_ADMIN_ADDRESS = - "0.0.0.0:" + DEFAULT_RM_HA_ADMIN_PORT; - public static final String RM_HA_ADMIN_CLIENT_THREAD_COUNT = - RM_HA_PREFIX + "admin.client.thread-count"; - public static final int DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT = 1; - // end @Private - //////////////////////////////// // RM state store configs //////////////////////////////// @@ -766,11 +754,6 @@ public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER = "security.resourcelocalizer.protocol.acl"; - @org.apache.hadoop.classification.InterfaceAudience.Private - // TODO Remove after YARN-1318 - public static final String - YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL = - CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL; /** No. of milliseconds to wait between sending a SIGTERM and SIGKILL * to a running container */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java index bb07bf8..74cb499 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java @@ -32,9 +32,9 @@ public RMHAServiceTarget(YarnConfiguration conf) throws IOException { haAdminServiceAddress = conf.getSocketAddr( - YarnConfiguration.RM_HA_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT); + YarnConfiguration.RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_PORT); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index f5ae5e8..07f76c9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -21,17 +21,29 @@ import java.io.IOException; import java.net.InetSocketAddress; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.BlockingService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceStatus; +import org.apache.hadoop.ha.HealthCheckFailedException; +import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RPC.Server; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -51,22 +63,19 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; -public class AdminService extends AbstractService implements ResourceManagerAdministrationProtocol { +public class AdminService extends AbstractService implements + HAServiceProtocol, ResourceManagerAdministrationProtocol { private static final Log LOG = LogFactory.getLog(AdminService.class); - private final Configuration conf; - private final ResourceScheduler scheduler; private final RMContext rmContext; - private final NodesListManager nodesListManager; - - private final ClientRMService clientRMService; - private final ApplicationMasterService applicationMasterService; - private final ResourceTrackerService resourceTrackerService; - + private final ResourceManager rm; + private HAServiceProtocol.HAServiceState haState; + private boolean haEnabled; + + private Server server; private InetSocketAddress masterServiceAddress; private AccessControlList adminAcl; @@ -74,23 +83,30 @@ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - public AdminService(Configuration conf, ResourceScheduler scheduler, - RMContext rmContext, NodesListManager nodesListManager, - ClientRMService clientRMService, - ApplicationMasterService applicationMasterService, - ResourceTrackerService resourceTrackerService) { + public AdminService(ResourceManager rm, RMContext rmContext) { super(AdminService.class.getName()); - this.conf = conf; - this.scheduler = scheduler; + this.rm = rm; this.rmContext = rmContext; - this.nodesListManager = nodesListManager; - this.clientRMService = clientRMService; - this.applicationMasterService = applicationMasterService; - this.resourceTrackerService = resourceTrackerService; + setHaState(HAServiceState.INITIALIZING); + } + + private synchronized HAServiceState getHaState() { + return this.haState; + } + + private synchronized void setHaState(HAServiceState haServiceState) { + this.haState = haServiceState; } @Override public void serviceInit(Configuration conf) throws Exception { + haEnabled = HAUtil.isHAEnabled(conf); + if (haEnabled) { + HAUtil.verifyAndSetConfiguration(conf); + rm.setConf(conf); + } + rm.createAndInitActiveServices(); + masterServiceAddress = conf.getSocketAddr( YarnConfiguration.RM_ADMIN_ADDRESS, YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, @@ -103,49 +119,186 @@ public void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws Exception { + if (haEnabled) { + transitionToStandby(true); + } else { + transitionToActive(); + } + startServer(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + stopServer(); + transitionToStandby(false); + setHaState(HAServiceState.STOPPING); + super.serviceStop(); + } + + protected void startServer() throws Exception { Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); - this.server = - rpc.getServer(ResourceManagerAdministrationProtocol.class, this, masterServiceAddress, - conf, null, - conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, - YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT)); - + this.server = (Server) rpc.getServer( + ResourceManagerAdministrationProtocol.class, this, masterServiceAddress, + conf, null, + conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT)); + // Enable service authorization? if (conf.getBoolean( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { refreshServiceAcls(conf, new RMPolicyProvider()); } + if (haEnabled) { + RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, + ProtobufRpcEngine.class); + + HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator = + new HAServiceProtocolServerSideTranslatorPB(this); + BlockingService haPbService = + HAServiceProtocolProtos.HAServiceProtocolService + .newReflectiveBlockingService(haServiceProtocolXlator); + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, + HAServiceProtocol.class, haPbService); + } + this.server.start(); conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS, - server.getListenerAddress()); - super.serviceStart(); + server.getListenerAddress()); } - @Override - protected void serviceStop() throws Exception { + protected void stopServer() throws Exception { if (this.server != null) { this.server.stop(); } - super.serviceStop(); + } + + private UserGroupInformation checkAccess(String method) throws IOException { + return RMServerUtils.verifyAccess(adminAcl, method, LOG); } private UserGroupInformation checkAcls(String method) throws YarnException { try { - return RMServerUtils.verifyAccess(adminAcl, method, LOG); + return checkAccess(method); } catch (IOException ioe) { throw RPCUtil.getRemoteException(ioe); } } - + + private synchronized boolean isRMActive() { + return HAServiceState.ACTIVE == getHaState(); + } + + @Override + public synchronized void monitorHealth() + throws IOException { + checkAccess("monitorHealth"); + if (getHaState() == HAServiceProtocol.HAServiceState.ACTIVE && + !rm.areActiveServicesRunning()) { + throw new HealthCheckFailedException( + "Active ResourceManager services are not running!"); + } + } + + private synchronized void transitionToActive() throws Exception { + if (getHaState() == HAServiceProtocol.HAServiceState.ACTIVE) { + LOG.info("Already in active state"); + return; + } + + LOG.info("Transitioning to active"); + rm.startActiveServices(); + setHaState(HAServiceProtocol.HAServiceState.ACTIVE); + LOG.info("Transitioned to active"); + } + + @Override + public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo) + throws IOException { + UserGroupInformation user = checkAccess("transitionToActive"); + // TODO (YARN-1177): When automatic failover is enabled, + // check if transition should be allowed for this request + try { + transitionToActive(); + RMAuditLogger.logSuccess(user.getShortUserName(), + "transitionToActive", "RMHAProtocolService"); + } catch (Exception e) { + RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", + adminAcl.toString(), "RMHAProtocolService", + "Exception transitioning to active"); + throw new ServiceFailedException( + "Error when transitioning to Active mode", e); + } + } + + private synchronized void transitionToStandby(boolean initialize) + throws Exception { + if (getHaState() == HAServiceProtocol.HAServiceState.STANDBY) { + LOG.info("Already in standby state"); + return; + } + + LOG.info("Transitioning to standby"); + if (getHaState() == HAServiceProtocol.HAServiceState.ACTIVE) { + rm.stopActiveServices(); + if (initialize) { + rm.createAndInitActiveServices(); + } + } + setHaState(HAServiceProtocol.HAServiceState.STANDBY); + LOG.info("Transitioned to standby"); + } + + @Override + public synchronized void transitionToStandby(HAServiceProtocol.StateChangeRequestInfo reqInfo) + throws IOException { + UserGroupInformation user = checkAccess("transitionToStandby"); + // TODO (YARN-1177): When automatic failover is enabled, + // check if transition should be allowed for this request + try { + transitionToStandby(true); + RMAuditLogger.logSuccess(user.getShortUserName(), + "transitionToStandby", "RMHAProtocolService"); + } catch (Exception e) { + RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby", + adminAcl.toString(), "RMHAProtocolService", + "Exception transitioning to standby"); + throw new ServiceFailedException( + "Error when transitioning to Standby mode", e); + } + } + + @Override + public synchronized HAServiceStatus getServiceStatus() throws IOException { + checkAccess("getServiceState"); + HAServiceState currentState = getHaState(); + HAServiceStatus ret = new HAServiceStatus(currentState); + if (currentState == HAServiceProtocol.HAServiceState.ACTIVE || + currentState == HAServiceProtocol.HAServiceState.STANDBY) { + ret.setReadyToBecomeActive(); + } else { + ret.setNotReadyToBecomeActive("State is " + currentState); + } + return ret; + } + @Override public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) throws YarnException { UserGroupInformation user = checkAcls("refreshQueues"); + + if (!isRMActive()) { + LOG.error("Couldn't refreshQueues since RM is not active"); + RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues", + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not refresh queues"); + } + try { - scheduler.reinitialize(conf, this.rmContext); + rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues", "AdminService"); return recordFactory.newRecordInstance(RefreshQueuesResponse.class); @@ -162,8 +315,16 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws YarnException { UserGroupInformation user = checkAcls("refreshNodes"); + + if (!isRMActive()) { + LOG.error("Couldn't refreshNodes since RM is not active"); + RMAuditLogger.logFailure(user.getShortUserName(), "refreshNodes", + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not refresh nodes"); + } + try { - this.nodesListManager.refreshNodes(new YarnConfiguration()); + rmContext.getNodesListManager().refreshNodes(new YarnConfiguration()); RMAuditLogger.logSuccess(user.getShortUserName(), "refreshNodes", "AdminService"); return recordFactory.newRecordInstance(RefreshNodesResponse.class); @@ -233,9 +394,16 @@ public RefreshServiceAclsResponse refreshServiceAcls( PolicyProvider policyProvider = new RMPolicyProvider(); refreshServiceAcls(conf, policyProvider); - clientRMService.refreshServiceAcls(conf, policyProvider); - applicationMasterService.refreshServiceAcls(conf, policyProvider); - resourceTrackerService.refreshServiceAcls(conf, policyProvider); + if (isRMActive()) { + rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider); + rmContext.getApplicationMasterService().refreshServiceAcls( + conf, policyProvider); + rmContext.getResourceTrackerService().refreshServiceAcls( + conf, policyProvider); + } else { + LOG.warn("ResourceManager is not active. Not refreshing ACLs for " + + "Clients, ApplicationMasters and NodeManagers"); + } return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class); } @@ -249,5 +417,4 @@ void refreshServiceAcls(Configuration configuration, public String[] getGroupsForUser(String user) throws IOException { return UserGroupInformation.createRemoteUser(user).getGroupNames(); } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 28101cc..3e8bd79 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -64,14 +66,55 @@ NMTokenSecretManagerInRM getNMTokenSecretManager(); + ResourceScheduler getScheduler(); + + NodesListManager getNodesListManager(); + ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager(); - - void setClientRMService(ClientRMService clientRMService); - + ClientRMService getClientRMService(); + + ApplicationMasterService getApplicationMasterService(); + + ResourceTrackerService getResourceTrackerService(); + + void setDispatcher(Dispatcher dispatcher); + + void setStateStore(RMStateStore stateStore); + void setClientRMService(ClientRMService clientRMService); + RMDelegationTokenSecretManager getRMDelegationTokenSecretManager(); void setRMDelegationTokenSecretManager( RMDelegationTokenSecretManager delegationTokenSecretManager); + + void setContainerAllocationExpirer( + ContainerAllocationExpirer containerAllocationExpirer); + + void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor); + + void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor); + + void setContainerTokenSecretManager( + RMContainerTokenSecretManager containerTokenSecretManager); + + void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager); + + void setScheduler(ResourceScheduler scheduler); + + void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer); + + void setClientToAMTokenSecretManager( + ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager); + + void setAMRMTokenSecretManager( + AMRMTokenSecretManager amRMTokenSecretManager); + + void setNodesListManager(NodesListManager nodesListManager); + + void setApplicationMasterService( + ApplicationMasterService applicationMasterService); + + void setResourceTrackerService(ResourceTrackerService resourceTrackerService); } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index d2592ed..6b16ed3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -42,7 +44,7 @@ public class RMContextImpl implements RMContext { - private final Dispatcher rmDispatcher; + private Dispatcher rmDispatcher; private final ConcurrentMap applications = new ConcurrentHashMap(); @@ -57,34 +59,24 @@ private AMLivelinessMonitor amFinishingMonitor; private RMStateStore stateStore = null; private ContainerAllocationExpirer containerAllocationExpirer; - private final DelegationTokenRenewer delegationTokenRenewer; - private final AMRMTokenSecretManager amRMTokenSecretManager; - private final RMContainerTokenSecretManager containerTokenSecretManager; - private final NMTokenSecretManagerInRM nmTokenSecretManager; - private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; + private DelegationTokenRenewer delegationTokenRenewer; + private AMRMTokenSecretManager amRMTokenSecretManager; + private RMContainerTokenSecretManager containerTokenSecretManager; + private NMTokenSecretManagerInRM nmTokenSecretManager; + private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; private ClientRMService clientRMService; private RMDelegationTokenSecretManager rmDelegationTokenSecretManager; + private ResourceScheduler scheduler; + private NodesListManager nodesListManager; + private ResourceTrackerService resourceTrackerService; + private ApplicationMasterService applicationMasterService; + + /** + * Default constructor. To be used in conjunction with setter methods for + * individual fields. + */ + public RMContextImpl() { - public RMContextImpl(Dispatcher rmDispatcher, - RMStateStore store, - ContainerAllocationExpirer containerAllocationExpirer, - AMLivelinessMonitor amLivelinessMonitor, - AMLivelinessMonitor amFinishingMonitor, - DelegationTokenRenewer delegationTokenRenewer, - AMRMTokenSecretManager amRMTokenSecretManager, - RMContainerTokenSecretManager containerTokenSecretManager, - NMTokenSecretManagerInRM nmTokenSecretManager, - ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { - this.rmDispatcher = rmDispatcher; - this.stateStore = store; - this.containerAllocationExpirer = containerAllocationExpirer; - this.amLivelinessMonitor = amLivelinessMonitor; - this.amFinishingMonitor = amFinishingMonitor; - this.delegationTokenRenewer = delegationTokenRenewer; - this.amRMTokenSecretManager = amRMTokenSecretManager; - this.containerTokenSecretManager = containerTokenSecretManager; - this.nmTokenSecretManager = nmTokenSecretManager; - this.clientToAMTokenSecretManager = clientToAMTokenSecretManager; } @VisibleForTesting @@ -98,10 +90,17 @@ public RMContextImpl(Dispatcher rmDispatcher, RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager, ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { - this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor, - amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager, - containerTokenSecretManager, nmTokenSecretManager, - clientToAMTokenSecretManager); + this(); + this.setDispatcher(rmDispatcher); + this.setContainerAllocationExpirer(containerAllocationExpirer); + this.setAMLivelinessMonitor(amLivelinessMonitor); + this.setAMFinishingMonitor(amFinishingMonitor); + this.setDelegationTokenRenewer(delegationTokenRenewer); + this.setAMRMTokenSecretManager(appTokenSecretManager); + this.setContainerTokenSecretManager(containerTokenSecretManager); + this.setNMTokenSecretManager(nmTokenSecretManager); + this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager); + RMStateStore nullStore = new NullRMStateStore(); nullStore.setRMDispatcher(rmDispatcher); try { @@ -171,7 +170,17 @@ public RMContainerTokenSecretManager getContainerTokenSecretManager() { public NMTokenSecretManagerInRM getNMTokenSecretManager() { return this.nmTokenSecretManager; } - + + @Override + public ResourceScheduler getScheduler() { + return this.scheduler; + } + + @Override + public NodesListManager getNodesListManager() { + return this.nodesListManager; + } + @Override public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() { return this.clientToAMTokenSecretManager; @@ -186,7 +195,22 @@ public void setStateStore(RMStateStore store) { public ClientRMService getClientRMService() { return this.clientRMService; } - + + @Override + public ApplicationMasterService getApplicationMasterService() { + return applicationMasterService; + } + + @Override + public ResourceTrackerService getResourceTrackerService() { + return resourceTrackerService; + } + + @Override + public void setDispatcher(Dispatcher dispatcher) { + this.rmDispatcher = dispatcher; + } + @Override public void setClientRMService(ClientRMService clientRMService) { this.clientRMService = clientRMService; @@ -202,4 +226,71 @@ public void setRMDelegationTokenSecretManager( RMDelegationTokenSecretManager delegationTokenSecretManager) { this.rmDelegationTokenSecretManager = delegationTokenSecretManager; } + + @Override + public void setContainerAllocationExpirer( + ContainerAllocationExpirer containerAllocationExpirer) { + this.containerAllocationExpirer = containerAllocationExpirer; + } + + @Override + public void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) { + this.amLivelinessMonitor = amLivelinessMonitor; + } + + @Override + public void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) { + this.amFinishingMonitor = amFinishingMonitor; + } + + @Override + public void setContainerTokenSecretManager( + RMContainerTokenSecretManager containerTokenSecretManager) { + this.containerTokenSecretManager = containerTokenSecretManager; + } + + @Override + public void setNMTokenSecretManager( + NMTokenSecretManagerInRM nmTokenSecretManager) { + this.nmTokenSecretManager = nmTokenSecretManager; + } + + @Override + public void setScheduler(ResourceScheduler scheduler) { + this.scheduler = scheduler; + } + + @Override + public void setDelegationTokenRenewer( + DelegationTokenRenewer delegationTokenRenewer) { + this.delegationTokenRenewer = delegationTokenRenewer; + } + + @Override + public void setClientToAMTokenSecretManager( + ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { + this.clientToAMTokenSecretManager = clientToAMTokenSecretManager; + } + + public void setAMRMTokenSecretManager( + AMRMTokenSecretManager amRMTokenSecretManager) { + this.amRMTokenSecretManager = amRMTokenSecretManager; + } + + @Override + public void setNodesListManager(NodesListManager nodesListManager) { + this.nodesListManager = nodesListManager; + } + + @Override + public void setApplicationMasterService( + ApplicationMasterService applicationMasterService) { + this.applicationMasterService = applicationMasterService; + } + + @Override + public void setResourceTrackerService( + ResourceTrackerService resourceTrackerService) { + this.resourceTrackerService = resourceTrackerService; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java deleted file mode 100644 index c74b282..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager; - -import com.google.common.annotations.VisibleForTesting; - -import com.google.protobuf.BlockingService; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.ha.HAServiceProtocol; -import org.apache.hadoop.ha.HAServiceStatus; -import org.apache.hadoop.ha.HealthCheckFailedException; -import org.apache.hadoop.ha.ServiceFailedException; -import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; -import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB; -import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.ipc.WritableRpcEngine; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authorize.AccessControlList; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.conf.HAUtil; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; - -import java.io.IOException; -import java.net.InetSocketAddress; - -/** - * Internal class to handle HA related aspects of the {@link ResourceManager}. - * - * TODO (YARN-1318): Some/ all of this functionality should be merged with - * {@link AdminService}. Currently, marking this as Private and Unstable for - * those reasons. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class RMHAProtocolService extends AbstractService implements - HAServiceProtocol { - private static final Log LOG = LogFactory.getLog(RMHAProtocolService.class); - - private Configuration conf; - private ResourceManager rm; - @VisibleForTesting - protected HAServiceState haState = HAServiceState.INITIALIZING; - private AccessControlList adminAcl; - private Server haAdminServer; - private boolean haEnabled; - - public RMHAProtocolService(ResourceManager resourceManager) { - super("RMHAProtocolService"); - this.rm = resourceManager; - } - - @Override - protected synchronized void serviceInit(Configuration conf) throws - Exception { - this.conf = conf; - haEnabled = HAUtil.isHAEnabled(this.conf); - if (haEnabled) { - HAUtil.verifyAndSetConfiguration(conf); - rm.setConf(this.conf); - adminAcl = new AccessControlList(conf.get( - YarnConfiguration.YARN_ADMIN_ACL, - YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); - } - rm.createAndInitActiveServices(); - super.serviceInit(this.conf); - } - - @Override - protected synchronized void serviceStart() throws Exception { - if (haEnabled) { - transitionToStandby(true); - startHAAdminServer(); - } else { - transitionToActive(); - } - - super.serviceStart(); - } - - @Override - protected synchronized void serviceStop() throws Exception { - if (haEnabled) { - stopHAAdminServer(); - } - transitionToStandby(false); - haState = HAServiceState.STOPPING; - super.serviceStop(); - } - - - protected void startHAAdminServer() throws Exception { - InetSocketAddress haAdminServiceAddress = conf.getSocketAddr( - YarnConfiguration.RM_HA_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT); - - RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, - ProtobufRpcEngine.class); - - HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator = - new HAServiceProtocolServerSideTranslatorPB(this); - BlockingService haPbService = - HAServiceProtocolProtos.HAServiceProtocolService - .newReflectiveBlockingService(haServiceProtocolXlator); - - WritableRpcEngine.ensureInitialized(); - - String bindHost = haAdminServiceAddress.getHostName(); - - int serviceHandlerCount = conf.getInt( - YarnConfiguration.RM_HA_ADMIN_CLIENT_THREAD_COUNT, - YarnConfiguration.DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT); - - haAdminServer = new RPC.Builder(conf) - .setProtocol(HAServiceProtocolPB.class) - .setInstance(haPbService) - .setBindAddress(bindHost) - .setPort(haAdminServiceAddress.getPort()) - .setNumHandlers(serviceHandlerCount) - .setVerbose(false) - .build(); - - // Enable service authorization? - if (conf.getBoolean( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { - haAdminServer.refreshServiceAcl(conf, new RMPolicyProvider()); - } - - haAdminServer.start(); - conf.updateConnectAddr(YarnConfiguration.RM_HA_ADMIN_ADDRESS, - haAdminServer.getListenerAddress()); - } - - private void stopHAAdminServer() throws Exception { - if (haAdminServer != null) { - haAdminServer.stop(); - haAdminServer.join(); - haAdminServer = null; - } - } - - @Override - public synchronized void monitorHealth() - throws IOException { - checkAccess("monitorHealth"); - if (haState == HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) { - throw new HealthCheckFailedException( - "Active ResourceManager services are not running!"); - } - } - - private synchronized void transitionToActive() throws Exception { - if (haState == HAServiceState.ACTIVE) { - LOG.info("Already in active state"); - return; - } - - LOG.info("Transitioning to active"); - rm.startActiveServices(); - haState = HAServiceState.ACTIVE; - LOG.info("Transitioned to active"); - } - - @Override - public synchronized void transitionToActive(StateChangeRequestInfo reqInfo) - throws IOException { - UserGroupInformation user = checkAccess("transitionToActive"); - // TODO (YARN-1177): When automatic failover is enabled, - // check if transition should be allowed for this request - try { - transitionToActive(); - RMAuditLogger.logSuccess(user.getShortUserName(), - "transitionToActive", "RMHAProtocolService"); - } catch (Exception e) { - RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", - adminAcl.toString(), "RMHAProtocolService", - "Exception transitioning to active"); - throw new ServiceFailedException( - "Error when transitioning to Active mode", e); - } - } - - private synchronized void transitionToStandby(boolean initialize) - throws Exception { - if (haState == HAServiceState.STANDBY) { - LOG.info("Already in standby state"); - return; - } - - LOG.info("Transitioning to standby"); - if (haState == HAServiceState.ACTIVE) { - rm.stopActiveServices(); - if (initialize) { - rm.createAndInitActiveServices(); - } - } - haState = HAServiceState.STANDBY; - LOG.info("Transitioned to standby"); - } - - @Override - public synchronized void transitionToStandby(StateChangeRequestInfo reqInfo) - throws IOException { - UserGroupInformation user = checkAccess("transitionToStandby"); - // TODO (YARN-1177): When automatic failover is enabled, - // check if transition should be allowed for this request - try { - transitionToStandby(true); - RMAuditLogger.logSuccess(user.getShortUserName(), - "transitionToStandby", "RMHAProtocolService"); - } catch (Exception e) { - RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby", - adminAcl.toString(), "RMHAProtocolService", - "Exception transitioning to standby"); - throw new ServiceFailedException( - "Error when transitioning to Standby mode", e); - } - } - - @Override - public synchronized HAServiceStatus getServiceStatus() throws IOException { - checkAccess("getServiceState"); - HAServiceStatus ret = new HAServiceStatus(haState); - if (haState == HAServiceState.ACTIVE || haState == HAServiceState.STANDBY) { - ret.setReadyToBecomeActive(); - } else { - ret.setNotReadyToBecomeActive("State is " + haState); - } - return ret; - } - - private UserGroupInformation checkAccess(String method) throws IOException { - return RMServerUtils.verifyAccess(adminAcl, method, LOG); - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index f9ee097..54b1f18 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -116,7 +116,9 @@ * the HA state of the RM. */ @VisibleForTesting - protected RMHAProtocolService haService; + protected RMContext rmContext; + @VisibleForTesting + protected AdminService adminService; /** * "Active" services. Services that need to run only on the Active RM. @@ -127,8 +129,7 @@ * in Active state. */ protected RMActiveServices activeServices; - protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager = - new ClientToAMTokenSecretManagerInRM(); + protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager; protected RMContainerTokenSecretManager containerTokenSecretManager; protected NMTokenSecretManagerInRM nmTokenSecretManager; @@ -141,7 +142,6 @@ private ClientRMService clientRM; protected ApplicationMasterService masterService; private ApplicationMasterLauncher applicationMasterLauncher; - private AdminService adminService; private ContainerAllocationExpirer containerAllocationExpirer; protected NMLivelinessMonitor nmLivelinessMonitor; protected NodesListManager nodesListManager; @@ -152,7 +152,6 @@ protected RMDelegationTokenSecretManager rmDTSecretManager; private DelegationTokenRenewer delegationTokenRenewer; private WebApp webApp; - protected RMContext rmContext; protected ResourceTrackerService resourceTracker; private boolean recoveryEnabled; @@ -181,9 +180,11 @@ protected static void setClusterTimeStamp(long timestamp) { protected void serviceInit(Configuration conf) throws Exception { validateConfigs(conf); this.conf = conf; + this.rmContext = new RMContextImpl(); + + adminService = createAdminService(); + addService(adminService); - haService = createRMHAProtocolService(); - addService(haService); super.serviceInit(conf); } @@ -195,11 +196,7 @@ protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler, @VisibleForTesting protected void setRMStateStore(RMStateStore rmStore) { rmStore.setRMDispatcher(rmDispatcher); - ((RMContextImpl) rmContext).setStateStore(rmStore); - } - - protected RMHAProtocolService createRMHAProtocolService() { - return new RMHAProtocolService(this); + rmContext.setStateStore(rmStore); } protected RMContainerTokenSecretManager createContainerTokenSecretManager( @@ -308,20 +305,31 @@ protected void serviceInit(Configuration configuration) throws Exception { rmDispatcher = createDispatcher(); addIfService(rmDispatcher); + rmContext.setDispatcher(rmDispatcher); + + clientToAMSecretManager = new ClientToAMTokenSecretManagerInRM(); + rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager); amRmTokenSecretManager = createAMRMTokenSecretManager(conf); + rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager); containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher); addService(containerAllocationExpirer); + rmContext.setContainerAllocationExpirer(containerAllocationExpirer); AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); addService(amLivelinessMonitor); + rmContext.setAMLivelinessMonitor(amLivelinessMonitor); AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); addService(amFinishingMonitor); + rmContext.setAMFinishingMonitor(amFinishingMonitor); containerTokenSecretManager = createContainerTokenSecretManager(conf); + rmContext.setContainerTokenSecretManager(containerTokenSecretManager); + nmTokenSecretManager = createNMTokenSecretManager(conf); + rmContext.setNMTokenSecretManager(nmTokenSecretManager); boolean isRecoveryEnabled = conf.getBoolean( YarnConfiguration.RECOVERY_ENABLED, @@ -345,24 +353,23 @@ protected void serviceInit(Configuration configuration) throws Exception { LOG.error("Failed to init state store", e); ExitUtil.terminate(1, e); } + rmContext.setStateStore(rmStore); if (UserGroupInformation.isSecurityEnabled()) { delegationTokenRenewer = createDelegationTokenRenewer(); + rmContext.setDelegationTokenRenewer(delegationTokenRenewer); } - rmContext = new RMContextImpl( - rmDispatcher, rmStore, containerAllocationExpirer, amLivelinessMonitor, - amFinishingMonitor, delegationTokenRenewer, amRmTokenSecretManager, - containerTokenSecretManager, nmTokenSecretManager, - clientToAMSecretManager); - // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); addService(nodesListManager); + rmContext.setNodesListManager(nodesListManager); // Initialize the scheduler scheduler = createScheduler(); + rmContext.setScheduler(scheduler); + schedulerDispatcher = createSchedulerEventDispatcher(); addIfService(schedulerDispatcher); rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); @@ -384,6 +391,7 @@ protected void serviceInit(Configuration configuration) throws Exception { resourceTracker = createResourceTrackerService(); addService(resourceTracker); + rmContext.setResourceTrackerService(resourceTracker); DefaultMetricsSystem.initialize("ResourceManager"); JvmMetrics.initSingleton("ResourceManager", null); @@ -399,6 +407,7 @@ protected void serviceInit(Configuration configuration) throws Exception { masterService = createApplicationMasterService(); addService(masterService) ; + rmContext.setApplicationMasterService(masterService); applicationACLsManager = new ApplicationACLsManager(conf); @@ -409,12 +418,11 @@ protected void serviceInit(Configuration configuration) throws Exception { rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext); rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager); + clientRM = createClientRMService(); rmContext.setClientRMService(clientRM); addService(clientRM); - - adminService = createAdminService(clientRM, masterService, resourceTracker); - addService(adminService); + rmContext.setClientRMService(clientRM); applicationMasterLauncher = createAMLauncher(); rmDispatcher.register(AMLauncherEventType.class, @@ -860,13 +868,8 @@ protected void createPolicyMonitors() { } } - protected AdminService createAdminService( - ClientRMService clientRMService, - ApplicationMasterService applicationMasterService, - ResourceTrackerService resourceTrackerService) { - return new AdminService(this.conf, scheduler, rmContext, - this.nodesListManager, clientRMService, applicationMasterService, - resourceTrackerService); + protected AdminService createAdminService() { + return new AdminService(this, rmContext); } @Private diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java index b5b590b..73c6295 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java @@ -35,7 +35,7 @@ @InterfaceAudience.Private @InterfaceStability.Unstable public class RMPolicyProvider extends PolicyProvider { - + private static final Service[] resourceManagerServices = new Service[] { new Service( @@ -53,9 +53,6 @@ new Service( YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL, ContainerManagementProtocolPB.class), - new Service( - YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL, - HAServiceProtocol.class), }; @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index aba334a..82f025a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -297,16 +297,6 @@ public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) } @Override - protected RMHAProtocolService createRMHAProtocolService() { - return new RMHAProtocolService(this) { - @Override - protected void startHAAdminServer() { - // do nothing - } - }; - } - - @Override protected ClientRMService createClientRMService() { return new ClientRMService(getRMContext(), getResourceScheduler(), rmAppManager, applicationACLsManager, queueACLsManager, @@ -381,19 +371,15 @@ protected void serviceStop() { } @Override - protected AdminService createAdminService(ClientRMService clientRMService, - ApplicationMasterService applicationMasterService, - ResourceTrackerService resourceTrackerService) { - return new AdminService(getConfig(), scheduler, getRMContext(), - this.nodesListManager, clientRMService, applicationMasterService, - resourceTrackerService) { + protected AdminService createAdminService() { + return new AdminService(this, getRMContext()) { @Override - protected void serviceStart() { + protected void startServer() { // override to not start rpc handler } @Override - protected void serviceStop() { + protected void stopServer() { // don't do anything } }; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index c783c74..7030106 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -62,7 +62,7 @@ public void setUp() throws Exception { private void checkMonitorHealth() throws IOException { try { - rm.haService.monitorHealth(); + rm.adminService.monitorHealth(); } catch (HealthCheckFailedException e) { fail("The RM is in bad health: it is Active, but the active services " + "are not running"); @@ -71,20 +71,20 @@ private void checkMonitorHealth() throws IOException { private void checkStandbyRMFunctionality() throws IOException { assertEquals(STATE_ERR, HAServiceState.STANDBY, - rm.haService.getServiceStatus().getState()); + rm.adminService.getServiceStatus().getState()); assertFalse("Active RM services are started", rm.areActiveServicesRunning()); assertTrue("RM is not ready to become active", - rm.haService.getServiceStatus().isReadyToBecomeActive()); + rm.adminService.getServiceStatus().isReadyToBecomeActive()); } private void checkActiveRMFunctionality() throws IOException { assertEquals(STATE_ERR, HAServiceState.ACTIVE, - rm.haService.getServiceStatus().getState()); + rm.adminService.getServiceStatus().getState()); assertTrue("Active RM services aren't started", rm.areActiveServicesRunning()); assertTrue("RM is not ready to become active", - rm.haService.getServiceStatus().isReadyToBecomeActive()); + rm.adminService.getServiceStatus().isReadyToBecomeActive()); try { rm.getNewAppId(); @@ -113,9 +113,9 @@ public void testStartAndTransitions() throws IOException { HAServiceProtocol.RequestSource.REQUEST_BY_USER); assertEquals(STATE_ERR, HAServiceState.INITIALIZING, - rm.haService.getServiceStatus().getState()); + rm.adminService.getServiceStatus().getState()); assertFalse("RM is ready to become active before being started", - rm.haService.getServiceStatus().isReadyToBecomeActive()); + rm.adminService.getServiceStatus().isReadyToBecomeActive()); checkMonitorHealth(); rm.start(); @@ -123,27 +123,27 @@ public void testStartAndTransitions() throws IOException { checkStandbyRMFunctionality(); // 1. Transition to Standby - must be a no-op - rm.haService.transitionToStandby(requestInfo); + rm.adminService.transitionToStandby(requestInfo); checkMonitorHealth(); checkStandbyRMFunctionality(); // 2. Transition to active - rm.haService.transitionToActive(requestInfo); + rm.adminService.transitionToActive(requestInfo); checkMonitorHealth(); checkActiveRMFunctionality(); // 3. Transition to active - no-op - rm.haService.transitionToActive(requestInfo); + rm.adminService.transitionToActive(requestInfo); checkMonitorHealth(); checkActiveRMFunctionality(); // 4. Transition to standby - rm.haService.transitionToStandby(requestInfo); + rm.adminService.transitionToStandby(requestInfo); checkMonitorHealth(); checkStandbyRMFunctionality(); // 5. Transition to active to check Active->Standby->Active works - rm.haService.transitionToActive(requestInfo); + rm.adminService.transitionToActive(requestInfo); checkMonitorHealth(); checkActiveRMFunctionality(); @@ -151,9 +151,9 @@ public void testStartAndTransitions() throws IOException { // become active rm.stop(); assertEquals(STATE_ERR, HAServiceState.STOPPING, - rm.haService.getServiceStatus().getState()); + rm.adminService.getServiceStatus().getState()); assertFalse("RM is ready to become active even after it is stopped", - rm.haService.getServiceStatus().isReadyToBecomeActive()); + rm.adminService.getServiceStatus().isReadyToBecomeActive()); assertFalse("Active RM services are started", rm.areActiveServicesRunning()); checkMonitorHealth(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 2075921..295fa86 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -188,12 +188,13 @@ public void setUp() throws Exception { AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); store = mock(RMStateStore.class); this.rmContext = - new RMContextImpl(rmDispatcher, store, + new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, null, new AMRMTokenSecretManager(conf), new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM()); + this.rmContext.setStateStore(store); rmDispatcher.register(RMAppAttemptEventType.class, new TestApplicationAttemptEventDispatcher(this.rmContext));