diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/RMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/RMClient.java new file mode 100644 index 0000000..5afe058 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/RMClient.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RpcInvocationHandler; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RMClient { + + private static final Log LOG = LogFactory.getLog(RMClient.class); + + private final Class protocol; + private final Configuration conf; + private final RetryPolicy defaultPolicy; + private T rmClient; + private InetSocketAddress rmAddress; + private long rmConnectWaitMS; + private long rmConnectionRetryIntervalMS; + private boolean waitForEver; + private Map methodNameToPolicyMap = + new HashMap(); + + + public RMClient(Class protocol, final Configuration conf) { + this.conf = conf; + this.protocol = protocol; + this.rmAddress = conf.getSocketAddr( + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); + rmConnectWaitMS = + conf.getInt( + YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) + * 1000; + rmConnectionRetryIntervalMS = + conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, + YarnConfiguration + .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS) + * 1000; + + if (rmConnectionRetryIntervalMS < 0) { + throw new YarnException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + + " should not be negative."); + } + + waitForEver = (rmConnectWaitMS == -1000); + + if (!waitForEver) { + if (rmConnectWaitMS < 0) { + throw new YarnException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + + " can be -1, but can not be other negative numbers"); + } + + // try connect once + if (rmConnectWaitMS < rmConnectionRetryIntervalMS) { + LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + + " is smaller than " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + + ". Only try connect once."); + rmConnectWaitMS = 0; + } + } + + // Create default RetryPolicy which will handle ConnectionException and + // EOFException exceptions + RetryPolicy retryPolicy = + (waitForEver) ? RetryPolicies.RETRY_FOREVER : + RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS, + rmConnectionRetryIntervalMS, + TimeUnit.MILLISECONDS); + Map, RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(java.net.ConnectException.class, retryPolicy); + exceptionToPolicyMap.put(java.io.EOFException.class, retryPolicy); + this.defaultPolicy = + (waitForEver) ? RetryPolicies.RETRY_FOREVER : + RetryPolicies.retryByException( + retryPolicy, exceptionToPolicyMap); + } + + @SuppressWarnings("unchecked") + public void connectToRM() throws IOException { + this.rmClient = + (T) RetryProxy.create(protocol, new DefaultFailoverProxyProvider(protocol, + getRMClient()), methodNameToPolicyMap, defaultPolicy); + } + + public Object invoke(String apiName, Class parameterType, Object[] args) + { + Method method; + try { + method = protocol.getMethod(apiName, parameterType); + RpcInvocationHandler inv = (RpcInvocationHandler) Proxy + .getInvocationHandler(rmClient); + return inv.invoke(rmClient, method, args); + }catch (Throwable e) { + throw new YarnException(e); + } + } + + @SuppressWarnings("unchecked") + protected T getRMClient() throws IOException { + return (T) UserGroupInformation.getCurrentUser().doAs( + new PrivilegedAction() { + + @Override + public T run() { + return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, + conf); + } + }); + } + + public void disconnect() { + if(this.rmClient != null) { + RPC.stopProxy(this.rmClient); + } + } + + public void addMethodNameToPolicyMap(String name, RetryPolicy retryPolicy) { + methodNameToPolicyMap.put(name, retryPolicy); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/LocalRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/LocalRMClient.java new file mode 100644 index 0000000..787fe30 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/LocalRMClient.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.util.RMClient; + +import com.google.common.annotations.VisibleForTesting; + +public class LocalRMClient extends RMClient { + T localRMClient; + + public LocalRMClient(Class protocol, Configuration conf, + T localRMClient) { + super(protocol, conf); + this.localRMClient = localRMClient; + } + + @VisibleForTesting + @Override + public T getRMClient() throws IOException { + return this.localRMClient; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 284cd94..15194ef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -61,6 +60,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.util.RMClient; import com.google.common.annotations.VisibleForTesting; @@ -76,8 +76,6 @@ private NodeId nodeId; private long nextHeartBeatInterval; - private ResourceTracker resourceTracker; - private InetSocketAddress rmAddress; private Resource totalResource; private int httpPort; private boolean isStopped; @@ -91,9 +89,8 @@ private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; - private long rmConnectWaitMS; - private long rmConnectionRetryIntervalMS; - private boolean waitForEver; + + private RMClient rmClient; private Runnable statusUpdaterRunnable; private Thread statusUpdater; @@ -110,7 +107,7 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, @Override public synchronized void init(Configuration conf) { - this.rmAddress = conf.getSocketAddr( + conf.getSocketAddr( YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); @@ -163,6 +160,8 @@ public void start() { this.httpPort = httpBindAddress.getPort(); // Registration has to be in start so that ContainerManager can get the // perNM tokens needed to authenticate ContainerTokens. + rmClient = createRMClient(getConfig()); + rmClient.connectToRM(); registerWithRM(); super.start(); startStatusUpdater(); @@ -175,6 +174,7 @@ public void start() { public synchronized void stop() { // Interrupt the updater. this.isStopped = true; + rmClient.disconnect(); super.stop(); } @@ -205,93 +205,22 @@ protected boolean isTokenKeepAliveEnabled(Configuration conf) { && isSecurityEnabled(); } - protected ResourceTracker getRMClient() { - Configuration conf = getConfig(); - YarnRPC rpc = YarnRPC.create(conf); - return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress, - conf); + protected RMClient createRMClient(Configuration conf) { + return new RMClient(ResourceTracker.class, conf); } @VisibleForTesting protected void registerWithRM() throws YarnRemoteException { - Configuration conf = getConfig(); - rmConnectWaitMS = - conf.getInt( - YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, - YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) - * 1000; - rmConnectionRetryIntervalMS = - conf.getLong( - YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, - YarnConfiguration - .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS) - * 1000; - - if(rmConnectionRetryIntervalMS < 0) { - throw new YarnException("Invalid Configuration. " + - YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + - " should not be negative."); - } - - waitForEver = (rmConnectWaitMS == -1000); - - if(! waitForEver) { - if(rmConnectWaitMS < 0) { - throw new YarnException("Invalid Configuration. " + - YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + - " can be -1, but can not be other negative numbers"); - } - - //try connect once - if(rmConnectWaitMS < rmConnectionRetryIntervalMS) { - LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS - + " is smaller than " - + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS - + ". Only try connect once."); - rmConnectWaitMS = 0; - } - } - - int rmRetryCount = 0; - long waitStartTime = System.currentTimeMillis(); - RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); request.setHttpPort(this.httpPort); request.setResource(this.totalResource); request.setNodeId(this.nodeId); RegisterNodeManagerResponse regNMResponse; - - while(true) { - try { - rmRetryCount++; - LOG.info("Connecting to ResourceManager at " + this.rmAddress - + ". current no. of attempts is " + rmRetryCount); - this.resourceTracker = getRMClient(); - regNMResponse = - this.resourceTracker.registerNodeManager(request); - this.rmIdentifier = regNMResponse.getRMIdentifier(); - break; - } catch(Throwable e) { - LOG.warn("Trying to connect to ResourceManager, " + - "current no. of failed attempts is "+rmRetryCount); - if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS - || waitForEver) { - try { - LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 - + " seconds before next connection retry to RM"); - Thread.sleep(rmConnectionRetryIntervalMS); - } catch(InterruptedException ex) { - //done nothing - } - } else { - String errorMessage = "Failed to Connect to RM, " + - "no. of failed attempts is "+rmRetryCount; - LOG.error(errorMessage,e); - throw new YarnException(errorMessage,e); - } - } - } + regNMResponse = + (RegisterNodeManagerResponse) rmClient.invoke( + "registerNodeManager", RegisterNodeManagerRequest.class, new Object[] {request}); + this.rmIdentifier = regNMResponse.getRMIdentifier(); // if the Resourcemanager instructs NM to shutdown. if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) { throw new YarnException( @@ -420,7 +349,6 @@ public long getRMIdentifier() { } protected void startStatusUpdater() { - statusUpdaterRunnable = new Runnable() { @Override @SuppressWarnings("unchecked") @@ -430,8 +358,6 @@ public void run() { // Send heartbeat try { NodeHeartbeatResponse response = null; - int rmRetryCount = 0; - long waitStartTime = System.currentTimeMillis(); NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext(); nodeStatus.setResponseId(lastHeartBeatID); @@ -443,29 +369,14 @@ public void run() { .getContainerTokenSecretManager().getCurrentKey()); } while (!isStopped) { - try { - rmRetryCount++; - response = resourceTracker.nodeHeartbeat(request); - break; - } catch (Throwable e) { - LOG.warn("Trying to heartbeat to ResourceManager, " - + "current no. of failed attempts is " + rmRetryCount); - if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS - || waitForEver) { - try { - LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 - + " seconds before next heartbeat to RM"); - Thread.sleep(rmConnectionRetryIntervalMS); - } catch(InterruptedException ex) { - //done nothing - } - } else { - String errorMessage = "Failed to heartbeat to RM, " + - "no. of failed attempts is "+rmRetryCount; - LOG.error(errorMessage,e); - throw new YarnException(errorMessage,e); - } - } + response = + (NodeHeartbeatResponse) rmClient.invoke("nodeHeartbeat", + NodeHeartbeatRequest.class, new Object[] {request}); + break; + } + if (response == null) { + throw new YarnException("The HeartBeat response " + + "can not be null"); } //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java index d71334e..9ab5105 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java @@ -20,6 +20,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.LocalRMClient; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; +import org.apache.hadoop.yarn.util.RMClient; /** * This class allows a node manager to run without without communicating with a @@ -52,8 +55,9 @@ public MockNodeStatusUpdater(Context context, Dispatcher dispatcher, } @Override - protected ResourceTracker getRMClient() { - return resourceTracker; + protected RMClient createRMClient(Configuration conf) { + return new LocalRMClient(ResourceTracker.class, conf, + resourceTracker); } private static class MockResourceTracker implements ResourceTracker { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index 292d00f..52c70ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -21,8 +21,10 @@ import java.io.File; import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.LocalRMClient; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.RMClient; import org.junit.Test; import static org.mockito.Mockito.*; @@ -91,10 +94,14 @@ public void testSuccessfulContainerLaunch() throws InterruptedException, NodeManagerMetrics metrics = NodeManagerMetrics.create(); NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics) { - @Override - protected ResourceTracker getRMClient() { - return new LocalRMInterface(); - }; + + @Override + protected RMClient + createRMClient(Configuration conf) { + return new LocalRMClient(ResourceTracker.class, + conf, + new LocalRMInterface()); + } @Override protected void startStatusUpdater() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 10dd155..945433e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -41,8 +41,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.LocalRMClient; import org.apache.hadoop.yarn.YarnException; -import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -77,6 +76,7 @@ import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service.STATE; import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.RMClient; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -253,8 +253,9 @@ public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, } @Override - protected ResourceTracker getRMClient() { - return resourceTracker; + protected RMClient createRMClient(Configuration conf) { + return new LocalRMClient(ResourceTracker.class, conf, + resourceTracker); } } @@ -268,8 +269,9 @@ public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher, } @Override - protected ResourceTracker getRMClient() { - return resourceTracker; + protected RMClient createRMClient(Configuration conf) { + return new LocalRMClient(ResourceTracker.class, conf, + resourceTracker); } } @@ -286,19 +288,22 @@ public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher, } @Override - protected ResourceTracker getRMClient() { - return resourceTracker; + protected RMClient createRMClient(Configuration conf) { + return new LocalRMClient(ResourceTracker.class, conf, + resourceTracker); } @Override protected boolean isTokenKeepAliveEnabled(Configuration conf) { return true; } + + protected ResourceTracker getResourceTracker() { + return this.resourceTracker; + } } private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl { - public ResourceTracker resourceTracker = - new MyResourceTracker(this.context); private Context context; private final long waitStartTime; private final long rmStartIntervalMS; @@ -315,14 +320,10 @@ public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher, } @Override - protected ResourceTracker getRMClient() { - if(System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS - || rmNeverStart) { - throw new YarnException("Faking RM start failure as start " + - "delay timer has not expired."); - } else { - return resourceTracker; - } + protected RMClient createRMClient(Configuration conf) { + return new LocalRMClient(ResourceTracker.class, conf, + new MyResourceTracker6(this.context, rmStartIntervalMS, + rmNeverStart)); } } @@ -336,8 +337,9 @@ public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher, } @Override - protected ResourceTracker getRMClient() { - return resourceTracker; + protected RMClient createRMClient(Configuration conf) { + return new LocalRMClient(ResourceTracker.class, conf, + resourceTracker); } } @@ -576,7 +578,62 @@ public RegisterNodeManagerResponse registerNodeManager( public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException { heartBeatID++; - throw RPCUtil.getRemoteException("NodeHeartbeat exception"); + throw new YarnException(new java.net.ConnectException( + "NodeHeartbeat exception")); + } + } + + private class MyResourceTracker6 implements ResourceTracker { + + private final Context context; + private long rmStartIntervalMS; + private boolean rmNeverStart; + private final long waitStartTime; + + public MyResourceTracker6(Context context, long rmStartIntervalMS, + boolean rmNeverStart) { + this.context = context; + this.rmStartIntervalMS = rmStartIntervalMS; + this.rmNeverStart = rmNeverStart; + this.waitStartTime = System.currentTimeMillis(); + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnRemoteException { + if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS + || rmNeverStart) { + throw new YarnException(new java.net.ConnectException( + "Faking RM start failure as start " + + "delay timer has not expired.")); + } else { + NodeId nodeId = request.getNodeId(); + Resource resource = request.getResource(); + LOG.info("Registering " + nodeId.toString()); + // NOTE: this really should be checking against the config value + InetSocketAddress expected = NetUtils.getConnectAddress( + conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1)); + Assert.assertEquals(NetUtils.getHostPortString(expected), + nodeId.toString()); + Assert.assertEquals(5 * 1024, resource.getMemory()); + registeredNodes.add(nodeId); + + RegisterNodeManagerResponse response = recordFactory + .newRecordInstance(RegisterNodeManagerResponse.class); + return response; + } + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnRemoteException { + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartBeatID++); + + NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. + newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null, + null, null, 1000L); + return nhResponse; } } @@ -860,7 +917,7 @@ public void testApplicationKeepAlive() throws Exception { Thread.sleep(1000l); } MyResourceTracker3 rt = - (MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient(); + (MyResourceTracker3) nm.getNodeStatusUpdater().getResourceTracker(); rt.context.getApplications().remove(rt.appId); Assert.assertEquals(1, rt.keepAliveRequests.size()); int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 5fd11d5..e445688 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.LocalRMClient; import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.service.Service.STATE; +import org.apache.hadoop.yarn.util.RMClient; import org.junit.After; import org.junit.Before; @@ -101,10 +103,12 @@ public BaseContainerManagerTest() throws UnsupportedFileSystemException { protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl( context, new AsyncDispatcher(), null, metrics) { + @Override - protected ResourceTracker getRMClient() { - return new LocalRMInterface(); - }; + protected RMClient createRMClient(Configuration conf) { + return new LocalRMClient(ResourceTracker.class, conf, + new LocalRMInterface()); + } @Override protected void startStatusUpdater() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 8ab0099..6dc42b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -31,6 +31,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.yarn.LocalRMClient; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.CompositeService; +import org.apache.hadoop.yarn.util.RMClient; public class MiniYARNCluster extends CompositeService { @@ -346,14 +348,14 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics) { @Override - protected ResourceTracker getRMClient() { + protected RMClient createRMClient(Configuration conf) { final ResourceTrackerService rt = resourceManager .getResourceTrackerService(); final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); // For in-process communication without RPC - return new ResourceTracker() { + ResourceTracker resourceTracker = new ResourceTracker() { @Override public NodeHeartbeatResponse nodeHeartbeat( @@ -386,6 +388,8 @@ public RegisterNodeManagerResponse registerNodeManager( return response; } }; + return new LocalRMClient(ResourceTracker.class, + conf, resourceTracker); }; }; };