diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 839765c..b38f017 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -294,6 +294,11 @@ RM_WEBAPP_ADDRESS, RM_WEBAPP_HTTPS_ADDRESS)); + public static final String RM_HA_FAILOVER_PROXY_PROVIDER_CLASS = + RM_HA_PREFIX + "failover-proxy-provider.class"; + public static final String DEFAULT_RM_HA_FAILOVER_PROXY_PROVIDER_CLASS = + "org.apache.hadoop.yarn.client.ConfiguredFailoverProxyProvider"; + //////////////////////////////// // RM state store configs //////////////////////////////// @@ -848,22 +853,31 @@ public static final String IS_MINI_YARN_CLUSTER = YARN_PREFIX + "is.minicluster"; + public static final String YARN_MC_PREFIX = YARN_PREFIX + "minicluster."; + /** Whether to use fixed ports with the minicluster. */ - public static final String YARN_MINICLUSTER_FIXED_PORTS = YARN_PREFIX - + "minicluster.fixed.ports"; + public static final String YARN_MINICLUSTER_FIXED_PORTS = + YARN_MC_PREFIX + "fixed.ports"; /** * Default is false to be able to run tests concurrently without port * conflicts. */ - public static boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false; + public static final boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false; + + /** + * Whether the NM should use RPC to connect to the RM. Default is false. + * Can be set to true only when using fixed ports. + */ + public static final String YARN_MINI_CLUSTER_USE_RPC = YARN_MC_PREFIX + "use-rpc"; + public static final boolean DEFAULT_YARN_MINI_CLUSTER_USE_RPC = false; /** * Whether users are explicitly trying to control resource monitoring * configuration for the MiniYARNCluster. Disabled by default. */ public static final String YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING = - YARN_PREFIX + "minicluster.control-resource-monitoring"; + YARN_MC_PREFIX + "control-resource-monitoring"; public static final boolean DEFAULT_YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING = false; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java index 049f4cc..7b3c684 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -30,21 +31,35 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; public class ClientRMProxy extends RMProxy { - private static final Log LOG = LogFactory.getLog(ClientRMProxy.class); + private static ClientRMProxy INSTANCE = new ClientRMProxy(); + + private interface ClientRMProtocols extends ApplicationClientProtocol, + ApplicationMasterProtocol, ResourceManagerAdministrationProtocol { + // Add nothing + } + + @SuppressWarnings("unchecked") public static T createRMProxy(final Configuration configuration, final Class protocol) throws IOException { YarnConfiguration conf = (configuration instanceof YarnConfiguration) ? (YarnConfiguration) configuration : new YarnConfiguration(configuration); - InetSocketAddress rmAddress = getRMAddress(conf, protocol); - return createRMProxy(conf, protocol, rmAddress); + if (HAUtil.isHAEnabled(conf)) { + YarnFailoverProxyProvider provider = + YarnFailoverProxyProvider.create(conf, INSTANCE, protocol); + return createRMProxy(conf, protocol, provider); + } else { + InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol); + return createRMProxy(conf, protocol, rmAddress); + } } private static void setupTokens(InetSocketAddress resourceManagerAddress) @@ -63,7 +78,8 @@ private static void setupTokens(InetSocketAddress resourceManagerAddress) } } - private static InetSocketAddress getRMAddress(YarnConfiguration conf, + @Override + public InetSocketAddress getRMAddress(YarnConfiguration conf, Class protocol) throws IOException { if (protocol == ApplicationClientProtocol.class) { return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, @@ -89,4 +105,12 @@ private static InetSocketAddress getRMAddress(YarnConfiguration conf, throw new IllegalStateException(message); } } + + @Override + public void checkAllowedProtocols(Class protocol) { + Preconditions.checkArgument( + protocol.isAssignableFrom(ClientRMProtocols.class), + "RM does not support this client protocol"); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java new file mode 100644 index 0000000..24a14c7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class TestRMFailover { + private static final Log LOG = + LogFactory.getLog(TestRMFailover.class.getName()); + + private static final String RM1_NODE_ID = "rm1"; + private static final int RM1_PORT_BASE = 10000; + private static final String RM2_NODE_ID = "rm2"; + private static final int RM2_PORT_BASE = 20000; + private static final HAServiceProtocol.StateChangeRequestInfo req = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED); + + private static Configuration conf; + private static MiniYARNCluster cluster; + + private static void setConfForRM(String rmId, String prefix, String value) { + conf.set(HAUtil.addSuffix(prefix, rmId), value); + } + + private static void setRpcAddressForRM(String rmId, int base) { + setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT)); + } + + private static AdminService getRMAdminService(int index) { + return + cluster.getResourceManager(index).getRMContext().getRMAdminService(); + } + + @BeforeClass + public static void setup() throws IOException { + conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); + setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE); + setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE); + + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, 10); + + + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + conf.setBoolean(YarnConfiguration.YARN_MINI_CLUSTER_USE_RPC, true); + + cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 1, 1, 1); + cluster.init(conf); + cluster.start(); + + cluster.getResourceManager(0).getRMContext().getRMAdminService() + .transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + } + + @AfterClass + public static void teardown() { + cluster.stop(); + } + + private void verifyNodeManagerConnected() + throws YarnException, InterruptedException { + ResourceManager rm = cluster.getResourceManager(); + GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance(); + + for (int i = 0; i < 600; i++) { + if (1 == rm.getClientRMService().getClusterMetrics(req) + .getClusterMetrics().getNumNodeManagers()) { + return; + } + Thread.sleep(100); + } + fail("NodeManager never registered with the RM"); + }; + + private void verifyClientConnection() { + YarnClient client = YarnClient.createYarnClient(); + client.init(conf); + client.start(); + try { + client.getYarnClusterMetrics(); + } catch (Exception e) { + LOG.error(e); + fail("Client couldn't connect to the Active RM"); + } finally { + client.stop(); + } + } + + @Test + public void testExplicitFailover() + throws YarnException, InterruptedException, IOException { + verifyNodeManagerConnected(); + verifyClientConnection(); + + // Failover to the second RM + getRMAdminService(0).transitionToStandby(req); + getRMAdminService(1).transitionToActive(req); + + verifyNodeManagerConnected(); + verifyClientConnection(); + + // Failover back to the first RM + verifyNodeManagerConnected(); + verifyClientConnection(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredFailoverProxyProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredFailoverProxyProvider.java new file mode 100644 index 0000000..d973423 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredFailoverProxyProvider.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +public class ConfiguredFailoverProxyProvider extends + YarnFailoverProxyProvider { + private static final Log LOG = + LogFactory.getLog(ConfiguredFailoverProxyProvider.class); + + private int currentProxyIndex = 0; + + @Override + public void performFailover(T currentProxy) { + if (LOG.isTraceEnabled()) { + LOG.trace("Performing failover " + currentProxyIndex); + } + currentProxyIndex = (currentProxyIndex + 1) % rmServiceIds.length; + conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]); + if (LOG.isTraceEnabled()) { + LOG.trace("Performed failover to RM " + rmServiceIds[currentProxyIndex]); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java index 5fff760..c4f4b2b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java @@ -26,29 +26,38 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; -import com.google.common.annotations.VisibleForTesting; - @InterfaceAudience.Public @InterfaceStability.Evolving @SuppressWarnings("unchecked") -public class RMProxy { +public abstract class RMProxy { private static final Log LOG = LogFactory.getLog(RMProxy.class); + @Private + public abstract void checkAllowedProtocols(Class protocol); + + @Private + public abstract InetSocketAddress getRMAddress( + YarnConfiguration conf, Class protocol) throws IOException; + public static T createRMProxy(final Configuration conf, final Class protocol, InetSocketAddress rmAddress) throws IOException { RetryPolicy retryPolicy = createRetryPolicy(conf); @@ -57,6 +66,13 @@ return (T) RetryProxy.create(protocol, proxy, retryPolicy); } + public static T createRMProxy( + final Configuration conf, final Class protocol, + final FailoverProxyProvider provider) throws IOException { + RetryPolicy retryPolicy = createRetryPolicy(conf); + return (T) RetryProxy.create(protocol, provider, retryPolicy); + } + private static T getProxy(final Configuration conf, final Class protocol, final InetSocketAddress rmAddress) throws IOException { @@ -90,10 +106,7 @@ public static RetryPolicy createRetryPolicy(Configuration conf) { } boolean waitForEver = (rmConnectWaitMS == -1); - - if (waitForEver) { - return RetryPolicies.RETRY_FOREVER; - } else { + if (!waitForEver) { if (rmConnectWaitMS < 0) { throw new YarnRuntimeException("Invalid Configuration. " + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS @@ -110,18 +123,41 @@ public static RetryPolicy createRetryPolicy(Configuration conf) { } } - RetryPolicy retryPolicy = - RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS, - rmConnectionRetryIntervalMS, - TimeUnit.MILLISECONDS); - - Map, RetryPolicy> exceptionToPolicyMap = - new HashMap, RetryPolicy>(); - exceptionToPolicyMap.put(ConnectException.class, retryPolicy); - //TO DO: after HADOOP-9576, IOException can be changed to EOFException - exceptionToPolicyMap.put(IOException.class, retryPolicy); - - return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, - exceptionToPolicyMap); + // Handle HA case first so it includes both time-bound and + // wait-for-ever cases + if (HAUtil.isHAEnabled(conf)) { + /** + * Use {@link CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY} + * to determine number of failovers. + * + * TODO (YARN-1460): Use the YARN-specific config to calculate the number + * of failovers + */ + final int ipcClientRetries = conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT); + final int ipcClientRetryInterval = conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_DEFAULT); + final int numFailovers = waitForEver + ? Integer.MAX_VALUE + : (int) rmConnectWaitMS / (ipcClientRetries * ipcClientRetryInterval); + return RetryPolicies.failoverOnNetworkException(numFailovers); + } else if (waitForEver) { + return RetryPolicies.RETRY_FOREVER; + } else { + RetryPolicy retryPolicy = + RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS, + rmConnectionRetryIntervalMS, + TimeUnit.MILLISECONDS); + + Map, RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(ConnectException.class, retryPolicy); + //TO DO: after HADOOP-9576, IOException can be changed to EOFException + exceptionToPolicyMap.put(IOException.class, retryPolicy); + return RetryPolicies.retryByException( + RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/YarnFailoverProxyProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/YarnFailoverProxyProvider.java new file mode 100644 index 0000000..d5b293c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/YarnFailoverProxyProvider.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.util.Collection; + +import static org.apache.hadoop.classification.InterfaceAudience.*; + +@Private +public abstract class YarnFailoverProxyProvider implements + FailoverProxyProvider { + private static final Log LOG = LogFactory.getLog + (YarnFailoverProxyProvider.class); + + private RMProxy rmProxy; + private Class protocol; + protected YarnConfiguration conf; + protected String[] rmServiceIds; + + private static void throwInvalidDefaultClassException() { + throw new YarnRuntimeException("Invalid default failover provider class" + + YarnConfiguration.DEFAULT_RM_HA_FAILOVER_PROXY_PROVIDER_CLASS); + } + + @SuppressWarnings("unchecked") + public static YarnFailoverProxyProvider create( + Configuration conf, RMProxy rmProxy, Class protocol) { + Class defaultProviderClass = null; + try { + defaultProviderClass = (Class) + Class.forName( + YarnConfiguration.DEFAULT_RM_HA_FAILOVER_PROXY_PROVIDER_CLASS); + } catch (ClassCastException cce) { + throwInvalidDefaultClassException(); + } catch (ClassNotFoundException cnfe) { + throwInvalidDefaultClassException(); + } + + YarnFailoverProxyProvider provider = ReflectionUtils.newInstance( + conf.getClass(YarnConfiguration.RM_HA_FAILOVER_PROXY_PROVIDER_CLASS, + defaultProviderClass, YarnFailoverProxyProvider.class), conf); + provider.init(conf, rmProxy, protocol); + return provider; + } + + private void init(Configuration conf, RMProxy rmProxy, + Class protocol) { + this.rmProxy = rmProxy; + this.rmProxy.checkAllowedProtocols(protocol); + this.conf = new YarnConfiguration(conf); + this.protocol = protocol; + Collection rmIds = HAUtil.getRMHAIds(conf); + this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]); + } + + @Override + public T getProxy() { + UserGroupInformation ugi; + final InetSocketAddress rmAddress; + try { + ugi = UserGroupInformation.getCurrentUser(); + rmAddress = this.rmProxy.getRMAddress(conf, protocol); + if (LOG.isTraceEnabled()) { + LOG.trace("Connecting to RM at " + rmAddress); + } + } catch (IOException ioe) { return null;} + + if (ugi == null) { + return null; + } + + return ugi.doAs( + new PrivilegedAction() { + @SuppressWarnings("unchecked") + @Override + public T run() { + return (T) YarnRPC.create(conf).getProxy( + protocol, rmAddress, conf); + } + }); + } + + @Override + public Class getInterface() { + return protocol; + } + + @Override + public void close() throws IOException { + // do nothing + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c43dc1a..c1d8c18 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -425,6 +425,14 @@ + When HA is enabled, the class to be used by Clients, AMs and + NMs to failover to the Active RM. It should extend + org.apache.hadoop.yarn.client.YarnFailoverProxyProvider + yarn.resourcemanager.ha.failover-proxy-provider.class + org.apache.hadoop.yarn.client.ConfiguredFailoverProxyProvider + + + The maximum number of completed applications RM keeps. yarn.resourcemanager.max-completed-applications 10000 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java index c25c597..d683a24 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java @@ -21,27 +21,39 @@ import java.io.IOException; import java.net.InetSocketAddress; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.client.YarnFailoverProxyProvider; import org.apache.hadoop.yarn.client.RMProxy; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; public class ServerRMProxy extends RMProxy { - private static final Log LOG = LogFactory.getLog(ServerRMProxy.class); + private static ServerRMProxy INSTANCE = new ServerRMProxy(); + + @SuppressWarnings("unchecked") public static T createRMProxy(final Configuration configuration, final Class protocol) throws IOException { YarnConfiguration conf = (configuration instanceof YarnConfiguration) ? (YarnConfiguration) configuration : new YarnConfiguration(configuration); - InetSocketAddress rmAddress = getRMAddress(conf, protocol); - return createRMProxy(conf, protocol, rmAddress); + if (HAUtil.isHAEnabled(conf)) { + YarnFailoverProxyProvider provider = + YarnFailoverProxyProvider.create(conf, INSTANCE, protocol); + return createRMProxy(conf, protocol, provider); + } else { + InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol); + return createRMProxy(conf, protocol, rmAddress); + } } - private static InetSocketAddress getRMAddress(YarnConfiguration conf, - Class protocol) { + @Override + public InetSocketAddress getRMAddress(YarnConfiguration conf, + Class protocol) { if (protocol == ResourceTracker.class) { return conf.getSocketAddr( YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, @@ -55,4 +67,11 @@ private static InetSocketAddress getRMAddress(YarnConfiguration conf, throw new IllegalStateException(message); } } + + @Override + public void checkAllowedProtocols(Class protocol) { + Preconditions.checkArgument( + protocol.isAssignableFrom(ResourceTracker.class), + "ResourceManager does not support this protocol"); + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 9829e86..ab66e7a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -91,9 +92,11 @@ private NodeManager[] nodeManagers; private ResourceManager[] resourceManagers; + private String[] rmIds; + + private boolean useFixedPorts; + private boolean useRpc; - private ResourceManagerWrapper resourceManagerWrapper; - private ConcurrentMap appMasters = new ConcurrentHashMap(16, 0.75f, 2); @@ -163,15 +166,7 @@ public MiniYARNCluster( } resourceManagers = new ResourceManager[numResourceManagers]; - for (int i = 0; i < numResourceManagers; i++) { - resourceManagers[i] = new ResourceManager(); - addService(new ResourceManagerWrapper(i)); - } - nodeManagers = new CustomNodeManager[numNodeManagers]; - for(int index = 0; index < numNodeManagers; index++) { - addService(new NodeManagerWrapper(index)); - nodeManagers[index] = new CustomNodeManager(); - } + nodeManagers = new NodeManager[numNodeManagers]; } /** @@ -185,20 +180,45 @@ public MiniYARNCluster(String testName, int numNodeManagers, this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs); } - @Override + @Override public void serviceInit(Configuration conf) throws Exception { + useFixedPorts = conf.getBoolean( + YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, + YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS); + useRpc = conf.getBoolean(YarnConfiguration.YARN_MINI_CLUSTER_USE_RPC, + YarnConfiguration.DEFAULT_YARN_MINI_CLUSTER_USE_RPC); + + if (useRpc && !useFixedPorts) { + throw new YarnRuntimeException("Invalid configuration!" + + " Minicluster can use rpc only when configured to use fixed ports"); + } + if (resourceManagers.length > 1) { conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); - - StringBuilder rmIds = new StringBuilder(); - for (int i = 0; i < resourceManagers.length; i++) { - if (i != 0) { - rmIds.append(","); + if (conf.get(YarnConfiguration.RM_HA_IDS) == null) { + StringBuilder rmIds = new StringBuilder(); + for (int i = 0; i < resourceManagers.length; i++) { + if (i != 0) { + rmIds.append(","); + } + rmIds.append("rm" + i); } - rmIds.append("rm" + i); + conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString()); } - conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString()); + Collection rmIdsCollection = HAUtil.getRMHAIds(conf); + rmIds = rmIdsCollection.toArray(new String[rmIdsCollection.size()]); + } + + for (int i = 0; i < resourceManagers.length; i++) { + resourceManagers[i] = new ResourceManager(); + addService(new ResourceManagerWrapper(i)); } + for(int index = 0; index < nodeManagers.length; index++) { + nodeManagers[index] = + useRpc ? new CustomNodeManager() : new ShortCircuitedNodeManager(); + addService(new NodeManagerWrapper(index)); + } + super.serviceInit( conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); } @@ -217,7 +237,7 @@ public File getTestWorkDir() { */ @InterfaceAudience.Private @VisibleForTesting - int getActiveRMIndex() { + public int getActiveRMIndex() { if (resourceManagers.length == 1) { return 0; } @@ -292,11 +312,9 @@ private void setNonHARMConfiguration(Configuration conf) { } private void setHARMConfiguration(Configuration conf) { - String rmId = "rm" + index; String hostname = MiniYARNCluster.getHostname(); - conf.set(YarnConfiguration.RM_HA_ID, rmId); for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) { - conf.set(HAUtil.addSuffix(confKey, rmId), hostname + ":0"); + conf.set(HAUtil.addSuffix(confKey, rmIds[index]), hostname + ":0"); } } @@ -304,15 +322,17 @@ private void setHARMConfiguration(Configuration conf) { protected synchronized void serviceInit(Configuration conf) throws Exception { conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true); - if (!conf.getBoolean( - YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, - YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) { + + if (!useFixedPorts) { if (HAUtil.isHAEnabled(conf)) { setHARMConfiguration(conf); } else { setNonHARMConfiguration(conf); } } + if (HAUtil.isHAEnabled(conf)) { + conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]); + } resourceManagers[index].init(conf); resourceManagers[index].getRMContext().getDispatcher().register (RMAppAttemptEventType.class, @@ -498,7 +518,9 @@ protected synchronized void serviceStop() throws Exception { protected void doSecureLogin() throws IOException { // Don't try to login using keytab in the testcase. } + } + private class ShortCircuitedNodeManager extends CustomNodeManager { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java deleted file mode 100644 index f62124e..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java +++ /dev/null @@ -1,71 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.server; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ha.HAServiceProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.AdminService; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.fail; - -public class TestMiniYARNClusterForHA { - MiniYARNCluster cluster; - - @Before - public void setup() throws IOException, InterruptedException { - Configuration conf = new YarnConfiguration(); - - cluster = new MiniYARNCluster(TestMiniYARNClusterForHA.class.getName(), - 2, 1, 1, 1); - cluster.init(conf); - cluster.start(); - - cluster.getResourceManager(0).getRMContext().getRMAdminService() - .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo( - HAServiceProtocol.RequestSource.REQUEST_BY_USER)); - - assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); - } - - @Test - public void testClusterWorks() throws YarnException, InterruptedException { - ResourceManager rm = cluster.getResourceManager(0); - GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance(); - - for (int i = 0; i < 600; i++) { - if (1 == rm.getClientRMService().getClusterMetrics(req) - .getClusterMetrics().getNumNodeManagers()) { - return; - } - Thread.sleep(100); - } - fail("NodeManager never registered with the RM"); - } -}