diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java deleted file mode 100644 index 91d0bf7..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java +++ /dev/null @@ -1,118 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.client; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.api.ApplicationClientProtocol; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; - -import com.google.common.base.Preconditions; - -public class ClientRMProxy extends RMProxy { - private static final Log LOG = LogFactory.getLog(ClientRMProxy.class); - private static final ClientRMProxy INSTANCE = new ClientRMProxy(); - - private interface ClientRMProtocols extends ApplicationClientProtocol, - ApplicationMasterProtocol, ResourceManagerAdministrationProtocol { - // Add nothing - } - - private ClientRMProxy(){ - super(); - } - - /** - * Create a proxy to the ResourceManager for the specified protocol. - * @param configuration Configuration with all the required information. - * @param protocol Client protocol for which proxy is being requested. - * @param Type of proxy. - * @return Proxy to the ResourceManager for the specified client protocol. - * @throws IOException - */ - public static T createRMProxy(final Configuration configuration, - final Class protocol) throws IOException { - return createRMProxy(configuration, protocol, INSTANCE); - } - - private static void setupTokens(InetSocketAddress resourceManagerAddress) - throws IOException { - // It is assumed for now that the only AMRMToken in AM's UGI is for this - // cluster/RM. TODO: Fix later when we have some kind of cluster-ID as - // default service-address, see YARN-986. - for (Token token : UserGroupInformation - .getCurrentUser().getTokens()) { - if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { - // This token needs to be directly provided to the AMs, so set the - // appropriate service-name. We'll need more infrastructure when we - // need to set it in HA case. - SecurityUtil.setTokenService(token, resourceManagerAddress); - } - } - } - - @InterfaceAudience.Private - @Override - protected InetSocketAddress getRMAddress(YarnConfiguration conf, - Class protocol) throws IOException { - if (protocol == ApplicationClientProtocol.class) { - return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT); - } else if (protocol == ResourceManagerAdministrationProtocol.class) { - return conf.getSocketAddr( - YarnConfiguration.RM_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADMIN_PORT); - } else if (protocol == ApplicationMasterProtocol.class) { - InetSocketAddress serviceAddr = - conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - setupTokens(serviceAddr); - return serviceAddr; - } else { - String message = "Unsupported protocol found when creating the proxy " + - "connection to ResourceManager: " + - ((protocol != null) ? protocol.getClass().getName() : "null"); - LOG.error(message); - throw new IllegalStateException(message); - } - } - - @InterfaceAudience.Private - @Override - protected void checkAllowedProtocols(Class protocol) { - Preconditions.checkArgument( - protocol.isAssignableFrom(ClientRMProtocols.class), - "RM does not support this client protocol"); - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index 96f1bbc..8900b16 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -24,6 +24,8 @@ import static org.junit.Assert.fail; import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,13 +33,18 @@ import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -55,6 +62,8 @@ private Configuration conf; private MiniYARNCluster cluster; + private ApplicationId fakeAppId; + private void setConfForRM(String rmId, String prefix, String value) { conf.set(HAUtil.addSuffix(prefix, rmId), value); @@ -77,6 +86,7 @@ private void setRpcAddressForRM(String rmId, int base) { @Before public void setup() throws IOException { + fakeAppId = ApplicationId.newInstance(System.currentTimeMillis(), 0); conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); @@ -179,4 +189,67 @@ public void testAutomaticFailover() failover(); verifyConnections(); } + + @Test + public void testWebAppProxyInStandAloneMode() throws YarnException, + InterruptedException, IOException { + WebAppProxyServer webAppProxyServer = new WebAppProxyServer(); + try { + conf.set(YarnConfiguration.PROXY_ADDRESS, "0.0.0.0:9099"); + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + verifyConnections(); + webAppProxyServer.init(conf); + + // Start webAppProxyServer + Assert.assertEquals(STATE.INITED, webAppProxyServer.getServiceState()); + webAppProxyServer.start(); + Assert.assertEquals(STATE.STARTED, webAppProxyServer.getServiceState()); + + URL wrongUrl = new URL("http://0.0.0.0:9099/proxy/" + fakeAppId); + HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl + .openConnection(); + + proxyConn.connect(); + verifyExpectedException(proxyConn.getResponseMessage()); + + explicitFailover(); + verifyConnections(); + proxyConn.connect(); + verifyExpectedException(proxyConn.getResponseMessage()); + } finally { + webAppProxyServer.stop(); + } + } + + @Test + public void testEmbeddedWebAppProxy() throws YarnException, + InterruptedException, IOException { + cluster.init(conf); + cluster.start(); + getAdminService(0).transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + verifyConnections(); + URL wrongUrl = new URL("http://0.0.0.0:18088/proxy/" + fakeAppId); + HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl + .openConnection(); + + proxyConn.connect(); + verifyExpectedException(proxyConn.getResponseMessage()); + + explicitFailover(); + verifyConnections(); + proxyConn.connect(); + verifyExpectedException(proxyConn.getResponseMessage()); + } + + private void verifyExpectedException(String exceptionMessage){ + assertTrue(exceptionMessage.contains(ApplicationNotFoundException.class + .getName())); + assertTrue(exceptionMessage + .contains("Application with id '" + fakeAppId + "' " + + "doesn't exist in RM.")); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java new file mode 100644 index 0000000..91d0bf7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -0,0 +1,118 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; + +import com.google.common.base.Preconditions; + +public class ClientRMProxy extends RMProxy { + private static final Log LOG = LogFactory.getLog(ClientRMProxy.class); + private static final ClientRMProxy INSTANCE = new ClientRMProxy(); + + private interface ClientRMProtocols extends ApplicationClientProtocol, + ApplicationMasterProtocol, ResourceManagerAdministrationProtocol { + // Add nothing + } + + private ClientRMProxy(){ + super(); + } + + /** + * Create a proxy to the ResourceManager for the specified protocol. + * @param configuration Configuration with all the required information. + * @param protocol Client protocol for which proxy is being requested. + * @param Type of proxy. + * @return Proxy to the ResourceManager for the specified client protocol. + * @throws IOException + */ + public static T createRMProxy(final Configuration configuration, + final Class protocol) throws IOException { + return createRMProxy(configuration, protocol, INSTANCE); + } + + private static void setupTokens(InetSocketAddress resourceManagerAddress) + throws IOException { + // It is assumed for now that the only AMRMToken in AM's UGI is for this + // cluster/RM. TODO: Fix later when we have some kind of cluster-ID as + // default service-address, see YARN-986. + for (Token token : UserGroupInformation + .getCurrentUser().getTokens()) { + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + // This token needs to be directly provided to the AMs, so set the + // appropriate service-name. We'll need more infrastructure when we + // need to set it in HA case. + SecurityUtil.setTokenService(token, resourceManagerAddress); + } + } + } + + @InterfaceAudience.Private + @Override + protected InetSocketAddress getRMAddress(YarnConfiguration conf, + Class protocol) throws IOException { + if (protocol == ApplicationClientProtocol.class) { + return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + } else if (protocol == ResourceManagerAdministrationProtocol.class) { + return conf.getSocketAddr( + YarnConfiguration.RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_PORT); + } else if (protocol == ApplicationMasterProtocol.class) { + InetSocketAddress serviceAddr = + conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + setupTokens(serviceAddr); + return serviceAddr; + } else { + String message = "Unsupported protocol found when creating the proxy " + + "connection to ResourceManager: " + + ((protocol != null) ? protocol.getClass().getName() : "null"); + LOG.error(message); + throw new IllegalStateException(message); + } + } + + @InterfaceAudience.Private + @Override + protected void checkAllowedProtocols(Class protocol) { + Preconditions.checkArgument( + protocol.isAssignableFrom(ClientRMProtocols.class), + "RM does not support this client protocol"); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 3dabdb2..16c7ac7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -147,9 +147,12 @@ protected QueueACLsManager queueACLsManager; private DelegationTokenRenewer delegationTokenRenewer; private WebApp webApp; + private AppReportFetcher fetcher = null; protected ResourceTrackerService resourceTracker; private boolean recoveryEnabled; + private String webAppAddress; + /** End of Active services */ private Configuration conf; @@ -194,6 +197,8 @@ protected void serviceInit(Configuration conf) throws Exception { } createAndInitActiveServices(); + webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(conf); + super.serviceInit(conf); } @@ -437,22 +442,12 @@ protected void serviceStart() throws Exception { throw e; } } - startWepApp(); - - if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { - int port = webApp.port(); - WebAppUtils.setRMWebAppPort(conf, port); - } super.serviceStart(); } @Override protected void serviceStop() throws Exception { - if (webApp != null) { - webApp.stop(); - } - DefaultMetricsSystem.shutdown(); @@ -752,12 +747,16 @@ protected void startWepApp() { YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY) .withHttpSpnegoKeytabKey( YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY) - .at(WebAppUtils.getRMWebAppURLWithoutScheme(conf)); + .at(webAppAddress); String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf); if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf). equals(proxyHostAndPort)) { - AppReportFetcher fetcher = new AppReportFetcher(conf, getClientRMService()); - builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, + if (HAUtil.isHAEnabled(conf)) { + fetcher = new AppReportFetcher(conf); + } else { + fetcher = new AppReportFetcher(conf, getClientRMService()); + } + builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class); builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher); String[] proxyParts = proxyHostAndPort.split(":"); @@ -854,6 +853,11 @@ protected void serviceStart() throws Exception { transitionToActive(); } + startWepApp(); + if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { + int port = webApp.port(); + WebAppUtils.setRMWebAppPort(conf, port); + } super.serviceStart(); } @@ -864,6 +868,12 @@ protected void doSecureLogin() throws IOException { @Override protected void serviceStop() throws Exception { + if (webApp != null) { + webApp.stop(); + } + if (fetcher != null) { + fetcher.stop(); + } super.serviceStop(); transitionToStandby(false); rmContext.setHAServiceState(HAServiceState.STOPPING); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java index 077783f..5c93413 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java @@ -19,21 +19,20 @@ package org.apache.hadoop.yarn.server.webproxy; import java.io.IOException; -import java.net.InetSocketAddress; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; /** * This class abstracts away how ApplicationReports are fetched. @@ -50,16 +49,12 @@ */ public AppReportFetcher(Configuration conf) { this.conf = conf; - YarnRPC rpc = YarnRPC.create(this.conf); - InetSocketAddress rmAddress = conf.getSocketAddr( - YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT); - LOG.info("Connecting to ResourceManager at " + rmAddress); - applicationsManager = - (ApplicationClientProtocol) rpc.getProxy(ApplicationClientProtocol.class, - rmAddress, this.conf); - LOG.info("Connected to ResourceManager at " + rmAddress); + try { + applicationsManager = ClientRMProxy.createRMProxy(conf, + ApplicationClientProtocol.class); + } catch (IOException e) { + throw new YarnRuntimeException(e); + } } /** @@ -91,4 +86,10 @@ public ApplicationReport getApplicationReport(ApplicationId appId) .getApplicationReport(request); return response.getApplicationReport(); } + + public void stop() { + if (this.applicationsManager != null) { + RPC.stopProxy(this.applicationsManager); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java index 2fbb0b8..fae279a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java @@ -117,6 +117,9 @@ protected void serviceStop() throws Exception { throw new YarnRuntimeException("Error stopping proxy web server",e); } } + if(this.fetcher != null) { + this.fetcher.stop(); + } super.serviceStop(); }