diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index 74b07c2..d9cf6b5 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -70,7 +70,8 @@ @Private @VisibleForTesting protected YarnClient client; - private InetSocketAddress rmAddress; +// private InetSocketAddress rmAddress; + private String clusterId; /** * Delegate responsible for communicating with the Resource Manager's @@ -87,9 +88,14 @@ public ResourceMgrDelegate(YarnConfiguration conf) { @Override protected void serviceInit(Configuration conf) throws Exception { - this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT); +// this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, +// YarnConfiguration.DEFAULT_RM_ADDRESS, +// YarnConfiguration.DEFAULT_RM_PORT); + this.clusterId = YarnConfiguration.getClusterId(conf); + if (clusterId == null) { + throw new IllegalArgumentException("Configuration doesn't have a value " + + "for " + YarnConfiguration.RM_CLUSTER_ID); + } client.init(conf); super.serviceInit(conf); } @@ -151,16 +157,16 @@ public ClusterMetrics getClusterMetrics() throws IOException, } } - InetSocketAddress getConnectAddress() { - return rmAddress; - } +// InetSocketAddress getConnectAddress() { +// return rmAddress; +// } @SuppressWarnings("rawtypes") public Token getDelegationToken(Text renewer) throws IOException, InterruptedException { try { return ConverterUtils.convertFromYarn( - client.getRMDelegationToken(renewer), rmAddress); + client.getRMDelegationToken(renewer), clusterId); } catch (YarnException e) { throw new IOException(e); } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index fc23c65..5a4d69d 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -186,8 +186,7 @@ void addHistoryToken(Credentials ts) throws IOException, InterruptedException { * to make sure we add history server delegation tokens to the credentials */ RMDelegationTokenSelector tokenSelector = new RMDelegationTokenSelector(); - Text service = SecurityUtil.buildTokenService(resMgrDelegate - .getConnectAddress()); + Text service = new Text(YarnConfiguration.getClusterId(conf)); if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) { Text hsService = SecurityUtil.buildTokenService(hsProxy .getConnectAddress()); diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index 025a10f..66ccf70 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -123,6 +123,7 @@ public void setUp() throws Exception { resourceMgrDelegate = mock(ResourceMgrDelegate.class); conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_CLUSTER_ID, "test-cluster"); conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM"); clientCache = new ClientCache(conf, resourceMgrDelegate); clientCache = spy(clientCache); @@ -268,10 +269,12 @@ protected void serviceStart() throws Exception { public void testGetHSDelegationToken() throws Exception { try { Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_CLUSTER_ID, "test-cluster"); // Setup mock service - InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444); - Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress); +// InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444); +// Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress); + Text rmTokenSevice = new Text(YarnConfiguration.getClusterId(conf)); InetSocketAddress mockHsAddress = new InetSocketAddress("localhost", 9200); Text hsTokenSevice = SecurityUtil.buildTokenService(mockHsAddress); @@ -299,7 +302,7 @@ public void testGetHSDelegationToken() throws Exception { any(GetDelegationTokenRequest.class)); ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class); - doReturn(mockRmAddress).when(rmDelegate).getConnectAddress(); +// doReturn(mockRmAddress).when(rmDelegate).getConnectAddress(); ClientCache clientCache = mock(ClientCache.class); doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy(); @@ -390,7 +393,7 @@ public Void run() throws Exception { @Test(timeout=20000) public void testAMAdminCommandOpts() throws Exception { - JobConf jobConf = new JobConf(); + JobConf jobConf = new JobConf(conf); jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true"); jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m"); @@ -456,7 +459,7 @@ public void testWarnCommandOpts() throws Exception { Appender appender = new WriterAppender(layout, bout); logger.addAppender(appender); - JobConf jobConf = new JobConf(); + JobConf jobConf = new JobConf(conf); jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true -Djava.library.path=foo"); jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m -Djava.library.path=bar"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1a5f90a..0496958 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -32,6 +32,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.exceptions.YarnException; @Public @Evolving @@ -1033,4 +1034,13 @@ public InetSocketAddress updateConnectAddr(String name, } return super.updateConnectAddr(prefix, addr); } + + public static String getClusterId(Configuration conf) { + String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID); + if (clusterId == null) { + throw new IllegalArgumentException("Configuration doesn't specify " + + YarnConfiguration.RM_CLUSTER_ID); + } + return clusterId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 4a98da4..55300d8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; @@ -251,7 +252,8 @@ public ApplicationReport getApplicationReport(ApplicationId appId) org.apache.hadoop.security.token.Token amrmToken = null; if (token != null) { - amrmToken = ConverterUtils.convertFromYarn(token, null); + amrmToken = ConverterUtils.convertFromYarn(token, + (InetSocketAddress) null); } return amrmToken; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java index 418ccb2..e8a8dcf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; @@ -139,15 +140,23 @@ public void cancel(Token token, Configuration conf) throws IOException, private static ApplicationClientProtocol getRmClient(Token token, Configuration conf) throws IOException { - InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token); - if (localSecretManager != null) { - // return null if it's our token - if (localServiceAddress.getAddress().isAnyLocalAddress()) { + try { + InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token); + if (localSecretManager != null) { + // return null if it's our token + if (localServiceAddress.getAddress().isAnyLocalAddress()) { if (NetUtils.isLocalAddress(addr.getAddress()) && addr.getPort() == localServiceAddress.getPort()) { return null; } - } else if (addr.equals(localServiceAddress)) { + } else if (addr.equals(localServiceAddress)) { + return null; + } + } + } catch (IllegalArgumentException iae) { + // The token service is not a host:port authority + if (token.getService().toString().equals( + YarnConfiguration.getClusterId(conf))) { return null; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java index 5fff8f4..9c4e8d2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java @@ -217,7 +217,7 @@ public static ApplicationId toApplicationId( /** * Convert a protobuf token into a rpc token and set its service - * + * * @param protoToken the yarn token * @param serviceAddr the connect address for the service * @return rpc token @@ -234,4 +234,24 @@ public static ApplicationId toApplicationId( } return token; } + + /** + * Convert a protobuf token into a rpc token and set its service + * + * @param protoToken the yarn token + * @param clusterId the connect address for the service + * @return rpc token + */ + public static Token convertFromYarn( + org.apache.hadoop.yarn.api.records.Token protoToken, + String clusterId) { + Token token = new Token(protoToken.getIdentifier().array(), + protoToken.getPassword().array(), + new Text(protoToken.getKind()), + new Text(protoToken.getService())); + if (clusterId != null) { + token.setService(new Text(clusterId)); + } + return token; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java index 3b5add8..e6ac799 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java @@ -102,7 +102,7 @@ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception { org.apache.hadoop.yarn.api.records.Token delegationToken = response.getRMDelegationToken(); Token token1 = - ConverterUtils.convertFromYarn(delegationToken, null); + ConverterUtils.convertFromYarn(delegationToken, (String) null); RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier(); // wait for the first rollMasterKey