diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index e65744b..7f64ed1 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -19,7 +19,6 @@ package org.apache.hadoop.mapred; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.EnumSet; import java.util.HashSet; import java.util.List; @@ -74,7 +73,7 @@ @Private @VisibleForTesting protected YarnClient client; - private InetSocketAddress rmAddress; + protected Text rmDTService; /** * Delegate responsible for communicating with the Resource Manager's @@ -91,9 +90,6 @@ public ResourceMgrDelegate(YarnConfiguration conf) { @Override protected void serviceInit(Configuration conf) throws Exception { - this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT); client.init(conf); super.serviceInit(conf); } @@ -155,8 +151,11 @@ public ClusterMetrics getClusterMetrics() throws IOException, } } - InetSocketAddress getConnectAddress() { - return rmAddress; + Text getRMDelegationTokenService() { + if (rmDTService == null) { + rmDTService = ConverterUtils.getRMDelegationTokenService(conf); + } + return rmDTService; } @SuppressWarnings("rawtypes") @@ -164,7 +163,7 @@ public Token getDelegationToken(Text renewer) throws IOException, InterruptedException { try { return ConverterUtils.convertFromYarn( - client.getRMDelegationToken(renewer), rmAddress); + client.getRMDelegationToken(renewer), getRMDelegationTokenService()); } catch (YarnException e) { throw new IOException(e); } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index ce475c1..1652d9f 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -188,8 +188,7 @@ void addHistoryToken(Credentials ts) throws IOException, InterruptedException { * to make sure we add history server delegation tokens to the credentials */ RMDelegationTokenSelector tokenSelector = new RMDelegationTokenSelector(); - Text service = SecurityUtil.buildTokenService(resMgrDelegate - .getConnectAddress()); + Text service = resMgrDelegate.getRMDelegationTokenService(); if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) { Text hsService = SecurityUtil.buildTokenService(hsProxy .getConnectAddress()); diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index 025a10f..a4407b8 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -271,7 +271,7 @@ public void testGetHSDelegationToken() throws Exception { // Setup mock service InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444); - Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress); + Text rmTokenSevice = new Text(mockRmAddress.toString()); InetSocketAddress mockHsAddress = new InetSocketAddress("localhost", 9200); Text hsTokenSevice = SecurityUtil.buildTokenService(mockHsAddress); @@ -299,7 +299,7 @@ public void testGetHSDelegationToken() throws Exception { any(GetDelegationTokenRequest.class)); ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class); - doReturn(mockRmAddress).when(rmDelegate).getConnectAddress(); + doReturn(rmTokenSevice).when(rmDelegate).getRMDelegationTokenService(); ClientCache clientCache = mock(ClientCache.class); doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 04ecea6..28af224 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -32,6 +32,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.exceptions.YarnException; @Public @Evolving @@ -1212,4 +1213,13 @@ public static boolean useHttps(Configuration conf) { .get(YARN_HTTP_POLICY_KEY, YARN_HTTP_POLICY_DEFAULT)); } + + public static String getClusterId(Configuration conf) { + String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID); + if (clusterId == null) { + throw new IllegalArgumentException("Configuration doesn't specify " + + YarnConfiguration.RM_CLUSTER_ID); + } + return clusterId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 51a7353..1253919 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; @@ -290,7 +291,8 @@ public ApplicationReport getApplicationReport(ApplicationId appId) org.apache.hadoop.security.token.Token amrmToken = null; if (token != null) { - amrmToken = ConverterUtils.convertFromYarn(token, null); + amrmToken = ConverterUtils.convertFromYarn(token, + (InetSocketAddress) null); } return amrmToken; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java index 418ccb2..de7501e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; @@ -139,16 +140,21 @@ public void cancel(Token token, Configuration conf) throws IOException, private static ApplicationClientProtocol getRmClient(Token token, Configuration conf) throws IOException { - InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token); - if (localSecretManager != null) { - // return null if it's our token - if (localServiceAddress.getAddress().isAnyLocalAddress()) { + String[] services = token.getService().toString().split(","); + for (String service : services) { + String[] hostPort = service.split(":"); + InetSocketAddress addr = new InetSocketAddress( + hostPort[0], Integer.parseInt(hostPort[1])); + if (localSecretManager != null) { + // return null if it's our token + if (localServiceAddress.getAddress().isAnyLocalAddress()) { if (NetUtils.isLocalAddress(addr.getAddress()) && addr.getPort() == localServiceAddress.getPort()) { return null; } - } else if (addr.equals(localServiceAddress)) { - return null; + } else if (addr.equals(localServiceAddress)) { + return null; + } } } return ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenSelector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenSelector.java index 53f04a0..b966421 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenSelector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenSelector.java @@ -37,6 +37,16 @@ private static final Log LOG = LogFactory .getLog(RMDelegationTokenSelector.class); + private boolean checkService(Text service, + Token token) { + try { + return token.getService().toString().contains(service.toString()); + } catch (NullPointerException npe) { + // one of the services is null + return false; + } + } + @SuppressWarnings("unchecked") public Token selectToken(Text service, Collection> tokens) { @@ -48,7 +58,7 @@ LOG.debug("Token kind is " + token.getKind().toString() + " and the token's service name is " + token.getService()); if (RMDelegationTokenIdentifier.KIND_NAME.equals(token.getKind()) - && service.equals(token.getService())) { + && checkService(service, token)) { return (Token) token; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java index 5fff8f4..0092398 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java @@ -23,12 +23,16 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import com.google.common.base.Joiner; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; @@ -39,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -216,8 +222,12 @@ public static ApplicationId toApplicationId( } /** - * Convert a protobuf token into a rpc token and set its service - * + * Convert a protobuf token into a rpc token and set its service. Supposed + * to be used for tokens other than RMDelegationToken. For + * RMDelegationToken, use + * {@link #convertFromYarn(org.apache.hadoop.yarn.api.records.Token, + * org.apache.hadoop.io.Text)} instead. + * * @param protoToken the yarn token * @param serviceAddr the connect address for the service * @return rpc token @@ -234,4 +244,54 @@ public static ApplicationId toApplicationId( } return token; } + + /** + * Get the token service name to be used for RMDelegationToken. Depending + * on whether HA is enabled or not, this method generates the appropriate + * service name. + * + * @param conf - Configuration for the cluster + * @return - + */ + @InterfaceStability.Unstable + public static Text getRMDelegationTokenService(Configuration conf) { + ArrayList services = new ArrayList(); + if (HAUtil.isHAEnabled(conf)) { + YarnConfiguration yarnConf = new YarnConfiguration(conf); + for (String rmId : HAUtil.getRMHAIds(conf)) { + yarnConf.set(YarnConfiguration.RM_HA_ID, rmId); + InetSocketAddress address = + yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + services.add(SecurityUtil.buildTokenService(address).toString()); + } + } else { + services.add(conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT).toString()); + } + return new Text(Joiner.on(',').join(services)); + } + + /** + * Convert a protobuf token into a rpc token and set its service. This is + * intended primarily for RMDelegationTokens, and is expected to be used in + * conjunction with + * {@link #getRMDelegationTokenService(org.apache.hadoop.conf.Configuration)}. + * + * @param protoToken the yarn token + * @param service the service for the token + */ + public static Token convertFromYarn( + org.apache.hadoop.yarn.api.records.Token protoToken, + Text service) { + Token token = new Token(protoToken.getIdentifier().array(), + protoToken.getPassword().array(), + new Text(protoToken.getKind()), + new Text(protoToken.getService())); + + token.setService(service); + return token; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestConverterUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestConverterUtils.java index 21af455..79f08ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestConverterUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestConverterUtils.java @@ -19,13 +19,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.net.URISyntaxException; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.hadoop.yarn.api.TestContainerId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Test; public class TestConverterUtils { @@ -58,5 +62,35 @@ public void testContainerId() throws URISyntaxException { @Test public void testContainerIdNull() throws URISyntaxException { assertNull(ConverterUtils.toString((ContainerId)null)); - } -} + } + + @Test + public void testGetRMDelegationTokenService() { + String defaultRMAddress = YarnConfiguration.DEFAULT_RM_ADDRESS; + YarnConfiguration conf = new YarnConfiguration(); + + // HA is not enabled + Text tokenService = ConverterUtils.getRMDelegationTokenService(conf); + String[] services = tokenService.toString().split(","); + assertEquals(1, services.length); + for (String service : services) { + assertTrue("Incorrect token service name", + service.contains(defaultRMAddress)); + } + + // HA is enabled + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"), + "0.0.0.0"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"), + "0.0.0.0"); + tokenService = ConverterUtils.getRMDelegationTokenService(conf); + services = tokenService.toString().split(","); + assertEquals(2, services.length); + for (String service : services) { + assertTrue("Incorrect token service name", + service.contains(defaultRMAddress)); + } + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java index 0c09c27..00bf897 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java @@ -74,11 +74,7 @@ protected synchronized void serviceInit(Configuration conf) } String rmId = HAUtil.getRMHAId(conf); - String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID); - if (clusterId == null) { - throw new YarnRuntimeException(YarnConfiguration.RM_CLUSTER_ID + - " is not specified!"); - } + String clusterId = YarnConfiguration.getClusterId(conf); localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId); String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java index 3b5add8..5a23322 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -102,7 +103,7 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception { org.apache.hadoop.yarn.api.records.Token delegationToken = response.getRMDelegationToken(); Token token1 = - ConverterUtils.convertFromYarn(delegationToken, null); + ConverterUtils.convertFromYarn(delegationToken, (Text) null); RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier(); // wait for the first rollMasterKey