diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/RMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/RMProxy.java new file mode 100644 index 0000000..99e69d3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/RMProxy.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RMProxy { + + private static final Log LOG = LogFactory.getLog(RMProxy.class); + + @SuppressWarnings("unchecked") + public static T createRMProxy(final Configuration conf, + final Class protocol, final InetSocketAddress rmAddress) + throws IOException { + RetryPolicy retryPolicy = createRetryPolicy(conf); + T proxy = getProxy(conf, protocol, rmAddress); + FailoverProxyProvider failoverProxyProvider = + new DefaultFailoverProxyProvider( + protocol, + proxy); + return (T) RetryProxy.create(protocol, failoverProxyProvider, retryPolicy); + } + + @SuppressWarnings("unchecked") + protected static T getProxy(final Configuration conf, + final Class protocol, final InetSocketAddress rmAddress) + throws IOException { + return (T) UserGroupInformation.getCurrentUser().doAs( + new PrivilegedAction() { + + @Override + public T run() { + return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, + conf); + } + }); + } + + public static RetryPolicy createRetryPolicy(Configuration conf) { + long rmConnectWaitMS = + conf.getInt( + YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) + * 1000; + long rmConnectionRetryIntervalMS = + conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, + YarnConfiguration + .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS) + * 1000; + + if (rmConnectionRetryIntervalMS < 0) { + throw new YarnException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + + " should not be negative."); + } + + boolean waitForEver = (rmConnectWaitMS == -1000); + + if (!waitForEver) { + if (rmConnectWaitMS < 0) { + throw new YarnException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + + " can be -1, but can not be other negative numbers"); + } + + // try connect once + if (rmConnectWaitMS < rmConnectionRetryIntervalMS) { + LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + + " is smaller than " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + + ". Only try connect once."); + rmConnectWaitMS = 0; + } + } + + RetryPolicy retryPolicy = + (waitForEver) ? RetryPolicies.RETRY_FOREVER : + RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS, + rmConnectionRetryIntervalMS, + TimeUnit.MILLISECONDS); + Map, RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(java.net.ConnectException.class, retryPolicy); + exceptionToPolicyMap.put(java.io.EOFException.class, retryPolicy); + return (waitForEver) ? RetryPolicies.RETRY_FOREVER : + RetryPolicies.retryByException( + retryPolicy, exceptionToPolicyMap); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/LocalRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/LocalRMProxy.java new file mode 100644 index 0000000..39029a5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/LocalRMProxy.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.yarn.util.RMProxy; + +public class LocalRMProxy { + + @SuppressWarnings("unchecked") + public static T createRMProxy(final Configuration conf, + final Class protocol, final InetSocketAddress rmAddress, T proxy) + throws IOException { + RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); + FailoverProxyProvider failoverProxyProvider = + new DefaultFailoverProxyProvider( + protocol, + proxy); + return (T) RetryProxy.create(protocol, failoverProxyProvider, retryPolicy); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 284cd94..fd698a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; @@ -33,6 +34,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -47,7 +49,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.util.RMProxy; import com.google.common.annotations.VisibleForTesting; @@ -91,9 +93,6 @@ private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; - private long rmConnectWaitMS; - private long rmConnectionRetryIntervalMS; - private boolean waitForEver; private Runnable statusUpdaterRunnable; private Thread statusUpdater; @@ -163,6 +162,7 @@ public void start() { this.httpPort = httpBindAddress.getPort(); // Registration has to be in start so that ContainerManager can get the // perNM tokens needed to authenticate ContainerTokens. + this.resourceTracker = createRMProxy(getConfig()); registerWithRM(); super.start(); startStatusUpdater(); @@ -175,6 +175,9 @@ public void start() { public synchronized void stop() { // Interrupt the updater. this.isStopped = true; + if(this.resourceTracker != null) { + RPC.stopProxy(this.resourceTracker); + } super.stop(); } @@ -205,93 +208,21 @@ protected boolean isTokenKeepAliveEnabled(Configuration conf) { && isSecurityEnabled(); } - protected ResourceTracker getRMClient() { - Configuration conf = getConfig(); - YarnRPC rpc = YarnRPC.create(conf); - return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress, - conf); + @VisibleForTesting + protected ResourceTracker createRMProxy(Configuration conf) throws IOException { + return RMProxy.createRMProxy(conf, ResourceTracker.class, rmAddress); } @VisibleForTesting protected void registerWithRM() throws YarnRemoteException { - Configuration conf = getConfig(); - rmConnectWaitMS = - conf.getInt( - YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, - YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) - * 1000; - rmConnectionRetryIntervalMS = - conf.getLong( - YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, - YarnConfiguration - .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS) - * 1000; - - if(rmConnectionRetryIntervalMS < 0) { - throw new YarnException("Invalid Configuration. " + - YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + - " should not be negative."); - } - - waitForEver = (rmConnectWaitMS == -1000); - - if(! waitForEver) { - if(rmConnectWaitMS < 0) { - throw new YarnException("Invalid Configuration. " + - YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + - " can be -1, but can not be other negative numbers"); - } - - //try connect once - if(rmConnectWaitMS < rmConnectionRetryIntervalMS) { - LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS - + " is smaller than " - + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS - + ". Only try connect once."); - rmConnectWaitMS = 0; - } - } - - int rmRetryCount = 0; - long waitStartTime = System.currentTimeMillis(); - RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); request.setHttpPort(this.httpPort); request.setResource(this.totalResource); request.setNodeId(this.nodeId); - RegisterNodeManagerResponse regNMResponse; - - while(true) { - try { - rmRetryCount++; - LOG.info("Connecting to ResourceManager at " + this.rmAddress - + ". current no. of attempts is " + rmRetryCount); - this.resourceTracker = getRMClient(); - regNMResponse = - this.resourceTracker.registerNodeManager(request); - this.rmIdentifier = regNMResponse.getRMIdentifier(); - break; - } catch(Throwable e) { - LOG.warn("Trying to connect to ResourceManager, " + - "current no. of failed attempts is "+rmRetryCount); - if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS - || waitForEver) { - try { - LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 - + " seconds before next connection retry to RM"); - Thread.sleep(rmConnectionRetryIntervalMS); - } catch(InterruptedException ex) { - //done nothing - } - } else { - String errorMessage = "Failed to Connect to RM, " + - "no. of failed attempts is "+rmRetryCount; - LOG.error(errorMessage,e); - throw new YarnException(errorMessage,e); - } - } - } + RegisterNodeManagerResponse regNMResponse = + resourceTracker.registerNodeManager(request); + this.rmIdentifier = regNMResponse.getRMIdentifier(); // if the Resourcemanager instructs NM to shutdown. if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) { throw new YarnException( @@ -430,8 +361,6 @@ public void run() { // Send heartbeat try { NodeHeartbeatResponse response = null; - int rmRetryCount = 0; - long waitStartTime = System.currentTimeMillis(); NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext(); nodeStatus.setResponseId(lastHeartBeatID); @@ -442,31 +371,8 @@ public void run() { request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context .getContainerTokenSecretManager().getCurrentKey()); } - while (!isStopped) { - try { - rmRetryCount++; - response = resourceTracker.nodeHeartbeat(request); - break; - } catch (Throwable e) { - LOG.warn("Trying to heartbeat to ResourceManager, " - + "current no. of failed attempts is " + rmRetryCount); - if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS - || waitForEver) { - try { - LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 - + " seconds before next heartbeat to RM"); - Thread.sleep(rmConnectionRetryIntervalMS); - } catch(InterruptedException ex) { - //done nothing - } - } else { - String errorMessage = "Failed to heartbeat to RM, " + - "no. of failed attempts is "+rmRetryCount; - LOG.error(errorMessage,e); - throw new YarnException(errorMessage,e); - } - } - } + response = + resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); // See if the master-key has rolled over diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java index d71334e..f280c12 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java @@ -18,8 +18,13 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.LocalRMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -52,8 +57,14 @@ public MockNodeStatusUpdater(Context context, Dispatcher dispatcher, } @Override - protected ResourceTracker getRMClient() { - return resourceTracker; + protected ResourceTracker createRMProxy(Configuration conf) + throws IOException { + return LocalRMProxy.createRMProxy(conf, ResourceTracker.class, + conf.getSocketAddr( + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT), + resourceTracker); } private static class MockResourceTracker implements ResourceTracker { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index 292d00f..c96f379 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -21,8 +21,10 @@ import java.io.File; import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.LocalRMProxy; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -91,10 +93,17 @@ public void testSuccessfulContainerLaunch() throws InterruptedException, NodeManagerMetrics metrics = NodeManagerMetrics.create(); NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics) { - @Override - protected ResourceTracker getRMClient() { - return new LocalRMInterface(); - }; + + @Override + protected ResourceTracker createRMProxy(Configuration conf) + throws IOException { + return LocalRMProxy.createRMProxy(conf, ResourceTracker.class, + conf.getSocketAddr( + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT), + new LocalRMInterface()); + } @Override protected void startStatusUpdater() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 10dd155..a448de5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -41,8 +41,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.LocalRMProxy; import org.apache.hadoop.yarn.YarnException; -import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -102,6 +101,7 @@ private NodeManager nm; private boolean containerStatusBackupSuccessfully = true; private List completedContainerStatusList = new ArrayList(); + private InetSocketAddress rmAddress; @After public void tearDown() { @@ -253,8 +253,9 @@ public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, } @Override - protected ResourceTracker getRMClient() { - return resourceTracker; + protected ResourceTracker createRMProxy(Configuration conf) throws IOException { + return LocalRMProxy.createRMProxy(conf, ResourceTracker.class, rmAddress, + resourceTracker); } } @@ -268,10 +269,10 @@ public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher, } @Override - protected ResourceTracker getRMClient() { - return resourceTracker; + protected ResourceTracker createRMProxy(Configuration conf) throws IOException { + return LocalRMProxy.createRMProxy(conf, ResourceTracker.class, rmAddress, + resourceTracker); } - } private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl { @@ -286,21 +287,23 @@ public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher, } @Override - protected ResourceTracker getRMClient() { - return resourceTracker; + protected ResourceTracker createRMProxy(Configuration conf) throws IOException { + return LocalRMProxy.createRMProxy(conf, ResourceTracker.class, rmAddress, + resourceTracker); } @Override protected boolean isTokenKeepAliveEnabled(Configuration conf) { return true; } + + protected ResourceTracker getResourceTracker() { + return this.resourceTracker; + } } private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl { - public ResourceTracker resourceTracker = - new MyResourceTracker(this.context); private Context context; - private final long waitStartTime; private final long rmStartIntervalMS; private final boolean rmNeverStart; @@ -309,20 +312,16 @@ public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher, long rmStartIntervalMS, boolean rmNeverStart) { super(context, dispatcher, healthChecker, metrics); this.context = context; - this.waitStartTime = System.currentTimeMillis(); this.rmStartIntervalMS = rmStartIntervalMS; this.rmNeverStart = rmNeverStart; } @Override - protected ResourceTracker getRMClient() { - if(System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS - || rmNeverStart) { - throw new YarnException("Faking RM start failure as start " + - "delay timer has not expired."); - } else { - return resourceTracker; - } + protected ResourceTracker createRMProxy(Configuration conf) + throws IOException { + return LocalRMProxy.createRMProxy(conf, ResourceTracker.class, rmAddress, + new MyResourceTracker6(this.context, rmStartIntervalMS, + rmNeverStart)); } } @@ -336,8 +335,10 @@ public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher, } @Override - protected ResourceTracker getRMClient() { - return resourceTracker; + protected ResourceTracker createRMProxy(Configuration conf) + throws IOException { + return LocalRMProxy.createRMProxy(conf, ResourceTracker.class, rmAddress, + resourceTracker); } } @@ -576,7 +577,62 @@ public RegisterNodeManagerResponse registerNodeManager( public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException { heartBeatID++; - throw RPCUtil.getRemoteException("NodeHeartbeat exception"); + throw new YarnException(new java.net.ConnectException( + "NodeHeartbeat exception")); + } + } + + private class MyResourceTracker6 implements ResourceTracker { + + private final Context context; + private long rmStartIntervalMS; + private boolean rmNeverStart; + private final long waitStartTime; + + public MyResourceTracker6(Context context, long rmStartIntervalMS, + boolean rmNeverStart) { + this.context = context; + this.rmStartIntervalMS = rmStartIntervalMS; + this.rmNeverStart = rmNeverStart; + this.waitStartTime = System.currentTimeMillis(); + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnRemoteException { + if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS + || rmNeverStart) { + throw new YarnException(new java.net.ConnectException( + "Faking RM start failure as start " + + "delay timer has not expired.")); + } else { + NodeId nodeId = request.getNodeId(); + Resource resource = request.getResource(); + LOG.info("Registering " + nodeId.toString()); + // NOTE: this really should be checking against the config value + InetSocketAddress expected = NetUtils.getConnectAddress( + conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1)); + Assert.assertEquals(NetUtils.getHostPortString(expected), + nodeId.toString()); + Assert.assertEquals(5 * 1024, resource.getMemory()); + registeredNodes.add(nodeId); + + RegisterNodeManagerResponse response = recordFactory + .newRecordInstance(RegisterNodeManagerResponse.class); + return response; + } + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnRemoteException { + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartBeatID++); + + NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. + newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null, + null, null, 1000L); + return nhResponse; } } @@ -860,7 +916,7 @@ public void testApplicationKeepAlive() throws Exception { Thread.sleep(1000l); } MyResourceTracker3 rt = - (MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient(); + (MyResourceTracker3) nm.getNodeStatusUpdater().getResourceTracker(); rt.context.getApplications().remove(rt.appId); Assert.assertEquals(1, rt.keepAliveRequests.size()); int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size(); @@ -1053,6 +1109,11 @@ private YarnConfiguration createNMConfig() { "remotelogs").toUri().getPath()); conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0") .toUri().getPath()); + this.rmAddress = + conf.getSocketAddr( + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); return conf; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 5fd11d5..37ed398 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.LocalRMProxy; import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -101,10 +102,17 @@ public BaseContainerManagerTest() throws UnsupportedFileSystemException { protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl( context, new AsyncDispatcher(), null, metrics) { + @Override - protected ResourceTracker getRMClient() { - return new LocalRMInterface(); - }; + protected ResourceTracker createRMProxy(Configuration conf) + throws IOException { + return LocalRMProxy.createRMProxy(conf, ResourceTracker.class, + conf.getSocketAddr( + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT), + new LocalRMInterface()); + } @Override protected void startStatusUpdater() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 8ab0099..c27c0b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -31,6 +31,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.yarn.LocalRMProxy; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -346,14 +347,15 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics) { @Override - protected ResourceTracker getRMClient() { + protected ResourceTracker createRMProxy(Configuration conf) + throws IOException { final ResourceTrackerService rt = resourceManager .getResourceTrackerService(); final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); // For in-process communication without RPC - return new ResourceTracker() { + ResourceTracker resourceTracker = new ResourceTracker() { @Override public NodeHeartbeatResponse nodeHeartbeat( @@ -386,6 +388,12 @@ public RegisterNodeManagerResponse registerNodeManager( return response; } }; + return LocalRMProxy.createRMProxy(conf, ResourceTracker.class, + conf.getSocketAddr( + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT), + resourceTracker); }; }; };