diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index e65744b..74d08c9 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -19,7 +19,6 @@ package org.apache.hadoop.mapred; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.EnumSet; import java.util.HashSet; import java.util.List; @@ -74,7 +73,7 @@ @Private @VisibleForTesting protected YarnClient client; - private InetSocketAddress rmAddress; + private Text rmDTService; /** * Delegate responsible for communicating with the Resource Manager's @@ -91,9 +90,6 @@ public ResourceMgrDelegate(YarnConfiguration conf) { @Override protected void serviceInit(Configuration conf) throws Exception { - this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT); client.init(conf); super.serviceInit(conf); } @@ -155,8 +151,12 @@ public ClusterMetrics getClusterMetrics() throws IOException, } } - InetSocketAddress getConnectAddress() { - return rmAddress; + @Override + public Text getRMDelegationTokenService() { + if (rmDTService == null) { + rmDTService = client.getRMDelegationTokenService(); + } + return rmDTService; } @SuppressWarnings("rawtypes") @@ -164,7 +164,7 @@ public Token getDelegationToken(Text renewer) throws IOException, InterruptedException { try { return ConverterUtils.convertFromYarn( - client.getRMDelegationToken(renewer), rmAddress); + client.getRMDelegationToken(renewer), getRMDelegationTokenService()); } catch (YarnException e) { throw new IOException(e); } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index ce475c1..1652d9f 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -188,8 +188,7 @@ void addHistoryToken(Credentials ts) throws IOException, InterruptedException { * to make sure we add history server delegation tokens to the credentials */ RMDelegationTokenSelector tokenSelector = new RMDelegationTokenSelector(); - Text service = SecurityUtil.buildTokenService(resMgrDelegate - .getConnectAddress()); + Text service = resMgrDelegate.getRMDelegationTokenService(); if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) { Text hsService = SecurityUtil.buildTokenService(hsProxy .getConnectAddress()); diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index 025a10f..39dec06 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -299,7 +299,7 @@ public void testGetHSDelegationToken() throws Exception { any(GetDelegationTokenRequest.class)); ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class); - doReturn(mockRmAddress).when(rmDelegate).getConnectAddress(); + doReturn(rmTokenSevice).when(rmDelegate).getRMDelegationTokenService(); ClientCache clientCache = mock(ClientCache.class); doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 04ecea6..98d49cd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1212,4 +1212,13 @@ public static boolean useHttps(Configuration conf) { .get(YARN_HTTP_POLICY_KEY, YARN_HTTP_POLICY_DEFAULT)); } + + public static String getClusterId(Configuration conf) { + String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID); + if (clusterId == null) { + throw new HadoopIllegalArgumentException("Configuration doesn't specify" + + YarnConfiguration.RM_CLUSTER_ID); + } + return clusterId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index c6db3ab..ead5c35 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -451,4 +451,15 @@ public abstract ContainerReport getContainerReport(ContainerId containerId) */ public abstract void moveApplicationAcrossQueues(ApplicationId appId, String queue) throws YarnException, IOException; + + + /** + * Get the token service name to be used for RMDelegationToken. Depending + * on whether HA is enabled or not, this method generates the appropriate + * service name as a comma-separated list of service addresses. + * + * @return - Service name for RMDelegationToken + */ + @InterfaceStability.Unstable + public abstract Text getRMDelegationTokenService(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 51a7353..55b7384 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Set; +import com.google.common.base.Joiner; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -31,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; @@ -68,6 +71,7 @@ import org.apache.hadoop.yarn.client.api.AHSClient; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -290,7 +294,8 @@ public ApplicationReport getApplicationReport(ApplicationId appId) org.apache.hadoop.security.token.Token amrmToken = null; if (token != null) { - amrmToken = ConverterUtils.convertFromYarn(token, null); + amrmToken = ConverterUtils.convertFromYarn(token, + (InetSocketAddress) null); } return amrmToken; } @@ -487,4 +492,28 @@ public void moveApplicationAcrossQueues(ApplicationId appId, MoveApplicationAcrossQueuesRequest.newInstance(appId, queue); rmClient.moveApplicationAcrossQueues(request); } + + public Text getRMDelegationTokenService() { + Configuration conf = getConfig(); + if (HAUtil.isHAEnabled(conf)) { + // Build a list of service addresses to form the service name + ArrayList services = new ArrayList(); + YarnConfiguration yarnConf = new YarnConfiguration(conf); + for (String rmId : HAUtil.getRMHAIds(conf)) { + // Set RM_ID to get the corresponding RM_ADDRESS + yarnConf.set(YarnConfiguration.RM_HA_ID, rmId); + services.add(SecurityUtil.buildTokenService( + yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT)).toString()); + } + return new Text(Joiner.on(',').join(services)); + } + + // Non-HA case - no need to set RM_ID + return SecurityUtil.buildTokenService( + conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT)); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 7c34966..db1c5aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.client.api.impl; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -38,6 +40,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -58,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -78,6 +82,45 @@ public void test() { } @Test + public void testGetRMDelegationTokenService() { + String defaultRMAddress = YarnConfiguration.DEFAULT_RM_ADDRESS; + YarnConfiguration conf = new YarnConfiguration(); + + YarnClient client = YarnClient.createYarnClient(); + client.init(conf); + client.start(); + + // HA is not enabled + Text tokenService = client.getRMDelegationTokenService(); + String[] services = tokenService.toString().split(","); + assertEquals(1, services.length); + for (String service : services) { + assertTrue("Incorrect token service name", + service.contains(defaultRMAddress)); + } + client.stop(); + + // HA is enabled + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"), + "0.0.0.0"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"), + "0.0.0.0"); + client = YarnClient.createYarnClient(); + client.init(conf); + client.start(); + tokenService = client.getRMDelegationTokenService(); + services = tokenService.toString().split(","); + assertEquals(2, services.length); + for (String service : services) { + assertTrue("Incorrect token service name", + service.contains(defaultRMAddress)); + } + client.stop(); + } + + @Test public void testClientStop() { Configuration conf = new Configuration(); ResourceManager rm = new ResourceManager(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java index 418ccb2..5bb3dcc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; @@ -139,16 +140,19 @@ public void cancel(Token token, Configuration conf) throws IOException, private static ApplicationClientProtocol getRmClient(Token token, Configuration conf) throws IOException { - InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token); - if (localSecretManager != null) { - // return null if it's our token - if (localServiceAddress.getAddress().isAnyLocalAddress()) { + String[] services = token.getService().toString().split(","); + for (String service : services) { + InetSocketAddress addr = NetUtils.createSocketAddr(service); + if (localSecretManager != null) { + // return null if it's our token + if (localServiceAddress.getAddress().isAnyLocalAddress()) { if (NetUtils.isLocalAddress(addr.getAddress()) && addr.getPort() == localServiceAddress.getPort()) { return null; } - } else if (addr.equals(localServiceAddress)) { - return null; + } else if (addr.equals(localServiceAddress)) { + return null; + } } } return ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenSelector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenSelector.java index 53f04a0..9ab2d76 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenSelector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenSelector.java @@ -37,6 +37,14 @@ private static final Log LOG = LogFactory .getLog(RMDelegationTokenSelector.class); + private boolean checkService(Text service, + Token token) { + if (service == null || token.getService() == null) { + return false; + } + return token.getService().toString().contains(service.toString()); + } + @SuppressWarnings("unchecked") public Token selectToken(Text service, Collection> tokens) { @@ -48,7 +56,7 @@ LOG.debug("Token kind is " + token.getKind().toString() + " and the token's service name is " + token.getService()); if (RMDelegationTokenIdentifier.KIND_NAME.equals(token.getKind()) - && service.equals(token.getService())) { + && checkService(service, token)) { return (Token) token; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java index 5fff8f4..d3fd4e3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java @@ -216,8 +216,12 @@ public static ApplicationId toApplicationId( } /** - * Convert a protobuf token into a rpc token and set its service - * + * Convert a protobuf token into a rpc token and set its service. Supposed + * to be used for tokens other than RMDelegationToken. For + * RMDelegationToken, use + * {@link #convertFromYarn(org.apache.hadoop.yarn.api.records.Token, + * org.apache.hadoop.io.Text)} instead. + * * @param protoToken the yarn token * @param serviceAddr the connect address for the service * @return rpc token @@ -234,4 +238,22 @@ public static ApplicationId toApplicationId( } return token; } + + /** + * Convert a protobuf token into a rpc token and set its service. + * + * @param protoToken the yarn token + * @param service the service for the token + */ + public static Token convertFromYarn( + org.apache.hadoop.yarn.api.records.Token protoToken, + Text service) { + Token token = new Token(protoToken.getIdentifier().array(), + protoToken.getPassword().array(), + new Text(protoToken.getKind()), + new Text(protoToken.getService())); + + token.setService(service); + return token; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java index 0c09c27..00bf897 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java @@ -74,11 +74,7 @@ protected synchronized void serviceInit(Configuration conf) } String rmId = HAUtil.getRMHAId(conf); - String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID); - if (clusterId == null) { - throw new YarnRuntimeException(YarnConfiguration.RM_CLUSTER_ID + - " is not specified!"); - } + String clusterId = YarnConfiguration.getClusterId(conf); localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId); String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java index 3b5add8..5a23322 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -102,7 +103,7 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception { org.apache.hadoop.yarn.api.records.Token delegationToken = response.getRMDelegationToken(); Token token1 = - ConverterUtils.convertFromYarn(delegationToken, null); + ConverterUtils.convertFromYarn(delegationToken, (Text) null); RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier(); // wait for the first rollMasterKey