diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 512ba83..a613f93 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authorize.PolicyProvider; @@ -109,6 +110,7 @@ private final RMAppManager rmAppManager; private Server server; + private Server[] additionalServers; protected RMDelegationTokenSecretManager rmDTSecretManager; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -116,6 +118,8 @@ private final ApplicationACLsManager applicationsACLsManager; + + public ClientRMService(RMContext rmContext, YarnScheduler scheduler, RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager, RMDelegationTokenSecretManager rmDTSecretManager) { @@ -139,31 +143,59 @@ protected void serviceStart() throws Exception { YarnRPC rpc = YarnRPC.create(conf); this.server = rpc.getServer(ApplicationClientProtocol.class, this, - clientBindAddress, - conf, this.rmDTSecretManager, - conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, - YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT)); - + clientBindAddress, + conf, this.rmDTSecretManager, + conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT)); + + //process additional ports + InetSocketAddress[] addresses = NetUtils.getAdditionalPorts(conf, + YarnConfiguration.RM_ADDRESS); + + additionalServers = new Server[addresses.length]; + int i = 0; + for (InetSocketAddress address:addresses){ + additionalServers[i] = rpc.getServer(ApplicationClientProtocol.class, this, + address, + conf, this.rmDTSecretManager, + conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT)); + } + // Enable service authorization? if (conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { - refreshServiceAcls(conf, new RMPolicyProvider()); + refreshServiceAcls(conf, new RMPolicyProvider(), this.server ); + for (Server addlServer : additionalServers){ + refreshServiceAcls(conf, new RMPolicyProvider(), addlServer ); + } } - + this.server.start(); + for (Server addlServer : additionalServers){ + addlServer.start(); + } clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS, - server.getListenerAddress()); + server.getListenerAddress()); // enable RM to short-circuit token operations directly to itself RMDelegationTokenIdentifier.Renewer.setSecretManager( rmDTSecretManager, clientBindAddress); + + for (InetSocketAddress address:addresses){ + RMDelegationTokenIdentifier.Renewer.setSecretManager( + rmDTSecretManager, address); + } super.serviceStart(); } @Override protected void serviceStop() throws Exception { if (this.server != null) { - this.server.stop(); + this.server.stop(); + } + for (Server addlServer: additionalServers){ + addlServer.stop(); } super.serviceStop(); } @@ -582,9 +614,9 @@ private String getRenewerForToken(Token token) : user.getShortUserName(); } - void refreshServiceAcls(Configuration configuration, - PolicyProvider policyProvider) { - this.server.refreshServiceAcl(configuration, policyProvider); + static void refreshServiceAcls(Configuration configuration, + PolicyProvider policyProvider, Server server) { + server.refreshServiceAcl(configuration, policyProvider); } private boolean isAllowedDelegationTokenOp() throws IOException {