diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java index b59bc25..a8dcda4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java @@ -57,7 +57,7 @@ new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC); - private RMContext rmContext; + private ResourceManager rm; private byte[] localActiveNodeInfo; private ActiveStandbyElector elector; @@ -66,9 +66,9 @@ @VisibleForTesting final Object zkDisconnectLock = new Object(); - ActiveStandbyElectorBasedElectorService(RMContext rmContext) { + ActiveStandbyElectorBasedElectorService(ResourceManager rm) { super(ActiveStandbyElectorBasedElectorService.class.getName()); - this.rmContext = rmContext; + this.rm = rm; } @Override @@ -140,7 +140,7 @@ public void becomeActive() throws ServiceFailedException { cancelDisconnectTimer(); try { - rmContext.getRMAdminService().transitionToActive(req); + rm.getRMContext().getRMAdminService().transitionToActive(req); } catch (Exception e) { throw new ServiceFailedException("RM could not transition to Active", e); } @@ -151,7 +151,7 @@ public void becomeStandby() { cancelDisconnectTimer(); try { - rmContext.getRMAdminService().transitionToStandby(req); + rm.getRMContext().getRMAdminService().transitionToStandby(req); } catch (Exception e) { LOG.error("RM could not transition to Standby", e); } @@ -205,7 +205,7 @@ public void run() { @SuppressWarnings(value = "unchecked") @Override public void notifyFatalError(String errorMessage) { - rmContext.getDispatcher().getEventHandler().handle( + rm.getRMContext().getDispatcher().getEventHandler().handle( new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 7571765..3457ae3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -102,7 +102,6 @@ private static final Log LOG = LogFactory.getLog(AdminService.class); - private final RMContext rmContext; private final ResourceManager rm; private String rmId; @@ -123,16 +122,16 @@ @VisibleForTesting boolean isCentralizedNodeLabelConfiguration = true; - public AdminService(ResourceManager rm, RMContext rmContext) { + public AdminService(ResourceManager rm) { super(AdminService.class.getName()); this.rm = rm; - this.rmContext = rmContext; } @Override public void serviceInit(Configuration conf) throws Exception { autoFailoverEnabled = - rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf); + rm.getRMContext().isHAEnabled() + && HAUtil.isAutomaticFailoverEnabled(conf); masterServiceBindAddress = conf.getSocketAddr( YarnConfiguration.RM_BIND_HOST, @@ -189,7 +188,7 @@ protected void startServer() throws Exception { RMPolicyProvider.getInstance()); } - if (rmContext.isHAEnabled()) { + if (rm.getRMContext().isHAEnabled()) { RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, ProtobufRpcEngine.class); @@ -265,7 +264,7 @@ private void checkHaStateChange(StateChangeRequestInfo req) } private synchronized boolean isRMActive() { - return HAServiceState.ACTIVE == rmContext.getHAServiceState(); + return HAServiceState.ACTIVE == rm.getRMContext().getHAServiceState(); } private void throwStandbyException() throws StandbyException { @@ -304,7 +303,7 @@ public synchronized void transitionToActive( // call all refresh*s for active RM to get the updated configurations. refreshAll(); } catch (Exception e) { - rmContext + rm.getRMContext() .getDispatcher() .getEventHandler() .handle( @@ -363,7 +362,7 @@ public synchronized void transitionToStandby( @Override public synchronized HAServiceStatus getServiceStatus() throws IOException { checkAccess("getServiceState"); - HAServiceState haState = rmContext.getHAServiceState(); + HAServiceState haState = rm.getRMContext().getHAServiceState(); HAServiceStatus ret = new HAServiceStatus(haState); if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { ret.setReadyToBecomeActive(); @@ -395,11 +394,12 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) } private void refreshQueues() throws IOException, YarnException { - rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); + rm.getRMContext().getScheduler().reinitialize(getConfig(), + this.rm.getRMContext()); // refresh the reservation system - ReservationSystem rSystem = rmContext.getReservationSystem(); + ReservationSystem rSystem = rm.getRMContext().getReservationSystem(); if (rSystem != null) { - rSystem.reinitialize(getConfig(), rmContext); + rSystem.reinitialize(getConfig(), rm.getRMContext()); } } @@ -418,14 +418,14 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); switch (request.getDecommissionType()) { case NORMAL: - rmContext.getNodesListManager().refreshNodes(conf); + rm.getRMContext().getNodesListManager().refreshNodes(conf); break; case GRACEFUL: - rmContext.getNodesListManager().refreshNodesGracefully( + rm.getRMContext().getNodesListManager().refreshNodesGracefully( conf, request.getDecommissionTimeout()); break; case FORCEFUL: - rmContext.getNodesListManager().refreshNodesForcefully(); + rm.getRMContext().getNodesListManager().refreshNodesForcefully(); break; } RMAuditLogger.logSuccess(user.getShortUserName(), operation, @@ -440,7 +440,7 @@ private void refreshNodes() throws IOException, YarnException { Configuration conf = getConfiguration(new Configuration(false), YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - rmContext.getNodesListManager().refreshNodes(conf); + rm.getRMContext().getNodesListManager().refreshNodes(conf); } @Override @@ -559,10 +559,11 @@ private void refreshActiveServicesAcls() throws IOException, YarnException { Configuration conf = getConfiguration(new Configuration(false), YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE); - rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider); - rmContext.getApplicationMasterService().refreshServiceAcls( + rm.getRMContext().getClientRMService().refreshServiceAcls(conf, + policyProvider); + rm.getRMContext().getApplicationMasterService().refreshServiceAcls( conf, policyProvider); - rmContext.getResourceTrackerService().refreshServiceAcls( + rm.getRMContext().getResourceTrackerService().refreshServiceAcls( conf, policyProvider); } @@ -601,7 +602,7 @@ public UpdateNodeResourceResponse updateNodeResource( // if any invalid nodes, throw exception instead of partially updating // valid nodes. for (NodeId nodeId : nodeIds) { - RMNode node = this.rmContext.getRMNodes().get(nodeId); + RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId); if (node == null) { LOG.error("Resource update get failed on all nodes due to change " + "resource on an unrecognized node: " + nodeId); @@ -619,14 +620,14 @@ public UpdateNodeResourceResponse updateNodeResource( for (Map.Entry entry : nodeResourceMap.entrySet()) { ResourceOption newResourceOption = entry.getValue(); NodeId nodeId = entry.getKey(); - RMNode node = this.rmContext.getRMNodes().get(nodeId); + RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId); if (node == null) { LOG.warn("Resource update get failed on an unrecognized node: " + nodeId); allSuccess = false; } else { // update resource to RMNode - this.rmContext.getDispatcher().getEventHandler() + this.rm.getRMContext().getDispatcher().getEventHandler() .handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption)); LOG.info("Update resource on node(" + node.getNodeID() + ") with resource(" + newResourceOption.toString() + ")"); @@ -661,7 +662,8 @@ public RefreshNodesResourcesResponse refreshNodesResources( DynamicResourceConfiguration newConf; InputStream drInputStream = - this.rmContext.getConfigurationProvider().getConfigurationInputStream( + this.rm.getRMContext().getConfigurationProvider() + .getConfigurationInputStream( configuration, YarnConfiguration.DR_CONFIGURATION_FILE); if (drInputStream != null) { @@ -679,7 +681,7 @@ public RefreshNodesResourcesResponse refreshNodesResources( updateNodeResource(updateRequest); } // refresh dynamic resource in ResourceTrackerService - this.rmContext.getResourceTrackerService(). + this.rm.getRMContext().getResourceTrackerService(). updateDynamicResourceConfiguration(newConf); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -692,7 +694,8 @@ public RefreshNodesResourcesResponse refreshNodesResources( private synchronized Configuration getConfiguration(Configuration conf, String... confFileNames) throws YarnException, IOException { for (String confFileName : confFileNames) { - InputStream confFileInputStream = this.rmContext.getConfigurationProvider() + InputStream confFileInputStream = + this.rm.getRMContext().getConfigurationProvider() .getConfigurationInputStream(conf, confFileName); if (confFileInputStream != null) { conf.addResource(confFileInputStream); @@ -746,7 +749,7 @@ public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLab AddToClusterNodeLabelsResponse response = recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class); try { - rmContext.getNodeLabelManager() + rm.getRMContext().getNodeLabelManager() .addToCluserNodeLabels(request.getNodeLabels()); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -769,7 +772,8 @@ public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( RemoveFromClusterNodeLabelsResponse response = recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class); try { - rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels()); + rm.getRMContext().getNodeLabelManager() + .removeFromClusterNodeLabels(request.getNodeLabels()); RMAuditLogger .logSuccess(user.getShortUserName(), operation, "AdminService"); return response; @@ -805,19 +809,20 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( boolean isKnown = false; // both active and inactive nodes are recognized as known nodes if (requestedNode.getPort() != 0) { - if (rmContext.getRMNodes().containsKey(requestedNode) - || rmContext.getInactiveRMNodes().containsKey(requestedNode)) { + if (rm.getRMContext().getRMNodes().containsKey(requestedNode) || rm + .getRMContext().getInactiveRMNodes().containsKey(requestedNode)) { isKnown = true; } } else { - for (NodeId knownNode : rmContext.getRMNodes().keySet()) { + for (NodeId knownNode : rm.getRMContext().getRMNodes().keySet()) { if (knownNode.getHost().equals(requestedNode.getHost())) { isKnown = true; break; } } if (!isKnown) { - for (NodeId knownNode : rmContext.getInactiveRMNodes().keySet()) { + for (NodeId knownNode : rm.getRMContext().getInactiveRMNodes() + .keySet()) { if (knownNode.getHost().equals(requestedNode.getHost())) { isKnown = true; break; @@ -841,7 +846,7 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( } } try { - rmContext.getNodeLabelManager().replaceLabelsOnNode( + rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode( request.getNodeToLabels()); RMAuditLogger .logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -878,7 +883,7 @@ public CheckForDecommissioningNodesResponse checkForDecommissioningNodes( checkRMStatus(user.getShortUserName(), operation, msg); - Set decommissioningNodes = rmContext.getNodesListManager() + Set decommissioningNodes = rm.getRMContext().getNodesListManager() .checkForDecommissioningNodes(); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -914,6 +919,6 @@ private void refreshClusterMaxPriority() throws IOException, YarnException { getConfiguration(new Configuration(false), YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - rmContext.getScheduler().setClusterMaxPriority(conf); + rm.getRMContext().getScheduler().setClusterMaxPriority(conf); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java index bcdf48b..d7485f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java @@ -45,14 +45,12 @@ LogFactory.getLog(CuratorBasedElectorService.class); private LeaderLatch leaderLatch; private CuratorFramework curator; - private RMContext rmContext; private String latchPath; private String rmId; private ResourceManager rm; - public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) { + public CuratorBasedElectorService(ResourceManager rm) { super(CuratorBasedElectorService.class.getName()); - this.rmContext = rmContext; this.rm = rm; } @@ -102,7 +100,8 @@ public String getZookeeperConnectionState() { public void isLeader() { LOG.info(rmId + "is elected leader, transitioning to active"); try { - rmContext.getRMAdminService().transitionToActive( + rm.getRMContext().getRMAdminService() + .transitionToActive( new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC)); } catch (Exception e) { @@ -123,7 +122,8 @@ private void closeLeaderLatch() throws IOException { public void notLeader() { LOG.info(rmId + " relinquish leadership"); try { - rmContext.getRMAdminService().transitionToStandby( + rm.getRMContext().getRMAdminService() + .transitionToStandby( new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC)); } catch (Exception e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index f727f55..cf59e77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -345,9 +345,9 @@ protected EmbeddedElector createEmbeddedElector() throws IOException { YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); if (curatorEnabled) { this.curator = createAndStartCurator(conf); - elector = new CuratorBasedElectorService(rmContext, this); + elector = new CuratorBasedElectorService(this); } else { - elector = new ActiveStandbyElectorBasedElectorService(rmContext); + elector = new ActiveStandbyElectorBasedElectorService(this); } return elector; } @@ -1149,6 +1149,7 @@ void reinitialize(boolean initialize) { ClusterMetrics.destroy(); QueueMetrics.clearQueueMetrics(); if (initialize) { + resetRMContext(); resetDispatcher(); createAndInitActiveServices(true); } @@ -1294,7 +1295,7 @@ protected ApplicationMasterService createApplicationMasterService() { } protected AdminService createAdminService() { - return new AdminService(this, rmContext); + return new AdminService(this); } protected RMSecretManagerService createRMSecretManagerService() { @@ -1430,6 +1431,28 @@ private void resetDispatcher() { rmContext.setDispatcher(rmDispatcher); } + private void resetRMContext() { + RMContextImpl rmContextImpl = new RMContextImpl(); + // transfer global objects into new context + rmContextImpl.setResourceManager(this); + rmContextImpl + .setConfigurationProvider(rmContext.getConfigurationProvider()); + rmContextImpl.setHAEnabled(rmContext.isHAEnabled()); + rmContextImpl.setRMAdminService(rmContext.getRMAdminService()); + rmContextImpl.setLeaderElectorService(rmContext.getLeaderElectorService()); + rmContextImpl.setYarnConfiguration(rmContext.getYarnConfiguration()); + rmContextImpl.setRMApplicationHistoryWriter( + rmContext.getRMApplicationHistoryWriter()); + if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) { + rmContextImpl.setRMTimelineCollectorManager( + rmContext.getRMTimelineCollectorManager()); + } + rmContextImpl + .setSystemMetricsPublisher(rmContext.getSystemMetricsPublisher()); + rmContextImpl.setHAServiceState(rmContext.getHAServiceState()); + rmContext = rmContextImpl; + } + private void setSchedulerRecoveryStartAndWaitTime(RMState state, Configuration conf) { if (!state.getApplicationState().isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 23009db..5a215e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -1055,7 +1055,7 @@ protected void serviceStop() { @Override protected AdminService createAdminService() { - return new AdminService(this, getRMContext()) { + return new AdminService(this) { @Override protected void startServer() { // override to not start rpc handler diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java index 1fe9bbe..1bd15d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java @@ -122,13 +122,15 @@ private void testCallbackSynchronization(SyncTestType type) throws IOException, InterruptedException { AdminService as = mock(AdminService.class); RMContext rc = mock(RMContext.class); + ResourceManager rm = mock(ResourceManager.class); Configuration myConf = new Configuration(conf); myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50); + when(rm.getRMContext()).thenReturn(rc); when(rc.getRMAdminService()).thenReturn(as); ActiveStandbyElectorBasedElectorService - ees = new ActiveStandbyElectorBasedElectorService(rc); + ees = new ActiveStandbyElectorBasedElectorService(rm); ees.init(myConf); ees.enterNeutralMode(); @@ -290,7 +292,7 @@ private void testCallbackSynchronizationTimingStandby(AdminService as, @Override protected EmbeddedElector createEmbeddedElector() { - return new ActiveStandbyElectorBasedElectorService(getRMContext()) { + return new ActiveStandbyElectorBasedElectorService(this) { @Override public void becomeActive() throws ServiceFailedException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index f807217..432f750 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -592,7 +592,7 @@ public void testTransitionedToActiveRefreshFail() throws Exception { rm = new MockRM(configuration) { @Override protected AdminService createAdminService() { - return new AdminService(this, getRMContext()) { + return new AdminService(this) { int counter = 0; @Override protected void setConfig(Configuration conf) {