diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 0e4cfe0..050977e 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -20,11 +20,11 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.security.PrivilegedAction; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.avro.AvroRuntimeException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,7 +38,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -49,11 +48,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.service.AbstractService; /** @@ -263,27 +261,12 @@ public void run() { protected AMRMProtocol createSchedulerProxy() { final Configuration conf = getConfig(); - final YarnRPC rpc = YarnRPC.create(conf); - final InetSocketAddress serviceAddr = conf.getSocketAddr( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - UserGroupInformation currentUser; try { - currentUser = UserGroupInformation.getCurrentUser(); + return ClientRMProxy.createRMProxy(conf, AMRMProtocol.class); } catch (IOException e) { - throw new YarnException(e); + throw new AvroRuntimeException(e); } - - // CurrentUser should already have AMToken loaded. - return currentUser.doAs(new PrivilegedAction() { - @Override - public AMRMProtocol run() { - return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, - serviceAddr, conf); - } - }); } protected abstract void heartbeat() throws Exception; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java index 350a1cb..92d5460 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java @@ -19,8 +19,6 @@ package org.apache.hadoop.yarn.client; import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -28,13 +26,12 @@ import java.util.TreeMap; import java.util.TreeSet; +import org.apache.avro.AvroRuntimeException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -51,7 +48,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -98,28 +94,11 @@ public synchronized void init(Configuration conf) { @Override public synchronized void start() { final YarnConfiguration conf = new YarnConfiguration(getConfig()); - final YarnRPC rpc = YarnRPC.create(conf); - final InetSocketAddress rmAddress = conf.getSocketAddr( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - - UserGroupInformation currentUser; try { - currentUser = UserGroupInformation.getCurrentUser(); + rmClient = ClientRMProxy.createRMProxy(conf, AMRMProtocol.class); } catch (IOException e) { - throw new YarnException(e); + throw new AvroRuntimeException(e); } - - // CurrentUser should already have AMToken loaded. - rmClient = currentUser.doAs(new PrivilegedAction() { - @Override - public AMRMProtocol run() { - return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, - conf); - } - }); - LOG.debug("Connecting to ResourceManager at " + rmAddress); super.start(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java new file mode 100644 index 0000000..6eda2fc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -0,0 +1,65 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.AMRMProtocol; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.RMAdminProtocol; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +public class ClientRMProxy extends RMProxy{ + + private static final Log LOG = LogFactory.getLog(ClientRMProxy.class); + + public static T createRMProxy(final Configuration conf, + final Class protocol) throws IOException { + InetSocketAddress rmAddress = getRMAddress(conf, protocol); + return createRMProxy(conf, protocol, rmAddress); + } + + private static InetSocketAddress getRMAddress(Configuration conf, Class protocol) { + if (protocol == ClientRMProtocol.class) { + return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + } else if (protocol == RMAdminProtocol.class) { + return conf.getSocketAddr( + YarnConfiguration.RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_PORT); + } else if (protocol == AMRMProtocol.class) { + return conf.getSocketAddr( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + } else { + String message = "Upsupported protocol found when creating the proxy " + + "connection to ResourceManager: " + + ((protocol != null) ? protocol.getClass().getName() : "null"); + LOG.error(message); + throw new IllegalStateException(message); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/RMAdmin.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/RMAdmin.java index b9be159..9f7bb58 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/RMAdmin.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/RMAdmin.java @@ -19,8 +19,6 @@ package org.apache.hadoop.yarn.client; import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.PrivilegedAction; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; @@ -40,7 +38,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; public class RMAdmin extends Configured implements Tool { @@ -160,32 +157,10 @@ private static void printUsage(String cmd) { } } - private static UserGroupInformation getUGI(Configuration conf - ) throws IOException { - return UserGroupInformation.getCurrentUser(); - } - private RMAdminProtocol createAdminProtocol() throws IOException { // Get the current configuration final YarnConfiguration conf = new YarnConfiguration(getConf()); - - // Create the client - final InetSocketAddress addr = conf.getSocketAddr( - YarnConfiguration.RM_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADMIN_PORT); - final YarnRPC rpc = YarnRPC.create(conf); - - RMAdminProtocol adminProtocol = - getUGI(conf).doAs(new PrivilegedAction() { - @Override - public RMAdminProtocol run() { - return (RMAdminProtocol) rpc.getProxy(RMAdminProtocol.class, - addr, conf); - } - }); - - return adminProtocol; + return ClientRMProxy.createRMProxy(conf, RMAdminProtocol.class); } private int refreshQueues() throws IOException, YarnRemoteException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java index 556cb35..435bcdb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.avro.AvroRuntimeException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -58,8 +59,6 @@ import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; -import org.apache.hadoop.yarn.ipc.RPCUtil; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.util.Records; @@ -102,12 +101,11 @@ public synchronized void init(Configuration conf) { @Override public synchronized void start() { - YarnRPC rpc = YarnRPC.create(getConfig()); - - this.rmClient = (ClientRMProtocol) rpc.getProxy( - ClientRMProtocol.class, rmAddress, getConfig()); - if (LOG.isDebugEnabled()) { - LOG.debug("Connecting to ResourceManager at " + rmAddress); + try { + rmClient = + RMProxy.createRMProxy(getConfig(), ClientRMProtocol.class, rmAddress); + } catch (IOException e) { + throw new AvroRuntimeException(e); } super.start(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java new file mode 100644 index 0000000..f7f2dab --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RMProxy { + + private static final Log LOG = LogFactory.getLog(RMProxy.class); + + @SuppressWarnings("unchecked") + public static T createRMProxy(final Configuration conf, + final Class protocol, InetSocketAddress rmAddress) throws IOException { + RetryPolicy retryPolicy = createRetryPolicy(conf); + T proxy = getProxy(conf, protocol, rmAddress); + LOG.info("Connecting to ResourceManager at " + rmAddress); + return (T) RetryProxy.create(protocol, proxy, retryPolicy); + } + + @SuppressWarnings("unchecked") + protected static T getProxy(final Configuration conf, + final Class protocol, final InetSocketAddress rmAddress) + throws IOException { + return (T) UserGroupInformation.getCurrentUser().doAs( + new PrivilegedAction() { + + @Override + public T run() { + return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf); + } + }); + } + + public static RetryPolicy createRetryPolicy(Configuration conf) { + long rmConnectWaitMS = + conf.getInt( + YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) + * 1000; + long rmConnectionRetryIntervalMS = + conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, + YarnConfiguration + .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS) + * 1000; + + if (rmConnectionRetryIntervalMS < 0) { + throw new YarnException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + + " should not be negative."); + } + + boolean waitForEver = (rmConnectWaitMS == -1000); + + if (waitForEver) { + return RetryPolicies.RETRY_FOREVER; + } else { + if (rmConnectWaitMS < 0) { + throw new YarnException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + + " can be -1, but can not be other negative numbers"); + } + + // try connect once + if (rmConnectWaitMS < rmConnectionRetryIntervalMS) { + LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + + " is smaller than " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + + ". Only try connect once."); + rmConnectWaitMS = 0; + } + } + + RetryPolicy retryPolicy = + RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS, + rmConnectionRetryIntervalMS, + TimeUnit.MILLISECONDS); + + Map, RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(ConnectException.class, retryPolicy); + exceptionToPolicyMap.put(IOException.class, retryPolicy); + + return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, + exceptionToPolicyMap); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java new file mode 100644 index 0000000..111c179 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java @@ -0,0 +1,55 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.client.RMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +public class ServerRMProxy extends RMProxy{ + + private static final Log LOG = LogFactory.getLog(ServerRMProxy.class); + + public static T createRMProxy(final Configuration conf, + final Class protocol) throws IOException { + InetSocketAddress rmAddress = getRMAddress(conf, protocol); + return createRMProxy(conf, protocol, rmAddress); + } + + private static InetSocketAddress getRMAddress(Configuration conf, Class protocol) { + if (protocol == ResourceTracker.class) { + return conf.getSocketAddr( + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); + } + else { + String message = "Upsupported protocol found when creating the proxy " + + "connection to ResourceManager: " + + ((protocol != null) ? protocol.getClass().getName() : "null"); + LOG.error(message); + throw new IllegalStateException(message); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java index b638284..46704fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.api.impl.pb.client; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; @@ -41,7 +42,7 @@ import com.google.protobuf.ServiceException; -public class ResourceTrackerPBClientImpl implements ResourceTracker { +public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable { private ResourceTrackerPB proxy; @@ -50,7 +51,14 @@ public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, C proxy = (ResourceTrackerPB)RPC.getProxy( ResourceTrackerPB.class, clientVersion, addr, conf); } - + + @Override + public void close() { + if(this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnRemoteException, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b33e7b3..2585493 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; @@ -34,6 +35,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -48,9 +50,9 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.ServerRMProxy; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -78,7 +80,6 @@ private NodeId nodeId; private long nextHeartBeatInterval; private ResourceTracker resourceTracker; - private InetSocketAddress rmAddress; private Resource totalResource; private int httpPort; private boolean isStopped; @@ -92,9 +93,6 @@ private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; - private long rmConnectWaitMS; - private long rmConnectionRetryIntervalMS; - private boolean waitForEver; private Runnable statusUpdaterRunnable; private Thread statusUpdater; @@ -111,11 +109,6 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, @Override public synchronized void init(Configuration conf) { - this.rmAddress = conf.getSocketAddr( - YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, - YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, - YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); - int memoryMb = conf.getInt( YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB); @@ -164,6 +157,7 @@ public void start() { this.httpPort = httpBindAddress.getPort(); // Registration has to be in start so that ContainerManager can get the // perNM tokens needed to authenticate ContainerTokens. + this.resourceTracker = getRMClient(); registerWithRM(); super.start(); startStatusUpdater(); @@ -176,6 +170,7 @@ public void start() { public synchronized void stop() { // Interrupt the updater. this.isStopped = true; + stopRMProxy(); super.stop(); } @@ -195,6 +190,13 @@ protected void rebootNodeStatusUpdater() { } } + @VisibleForTesting + protected void stopRMProxy() { + if(this.resourceTracker != null) { + RPC.stopProxy(this.resourceTracker); + } + } + @Private protected boolean isTokenKeepAliveEnabled(Configuration conf) { return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, @@ -202,93 +204,21 @@ protected boolean isTokenKeepAliveEnabled(Configuration conf) { && UserGroupInformation.isSecurityEnabled(); } - protected ResourceTracker getRMClient() { + protected ResourceTracker getRMClient() throws IOException { Configuration conf = getConfig(); - YarnRPC rpc = YarnRPC.create(conf); - return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress, - conf); + return ServerRMProxy.createRMProxy(conf, ResourceTracker.class); } @VisibleForTesting protected void registerWithRM() throws YarnRemoteException, IOException { - Configuration conf = getConfig(); - rmConnectWaitMS = - conf.getInt( - YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, - YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) - * 1000; - rmConnectionRetryIntervalMS = - conf.getLong( - YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, - YarnConfiguration - .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS) - * 1000; - - if(rmConnectionRetryIntervalMS < 0) { - throw new YarnException("Invalid Configuration. " + - YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + - " should not be negative."); - } - - waitForEver = (rmConnectWaitMS == -1000); - - if(! waitForEver) { - if(rmConnectWaitMS < 0) { - throw new YarnException("Invalid Configuration. " + - YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + - " can be -1, but can not be other negative numbers"); - } - - //try connect once - if(rmConnectWaitMS < rmConnectionRetryIntervalMS) { - LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS - + " is smaller than " - + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS - + ". Only try connect once."); - rmConnectWaitMS = 0; - } - } - - int rmRetryCount = 0; - long waitStartTime = System.currentTimeMillis(); - RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); request.setHttpPort(this.httpPort); request.setResource(this.totalResource); request.setNodeId(this.nodeId); - RegisterNodeManagerResponse regNMResponse; - - while(true) { - try { - rmRetryCount++; - LOG.info("Connecting to ResourceManager at " + this.rmAddress - + ". current no. of attempts is " + rmRetryCount); - this.resourceTracker = getRMClient(); - regNMResponse = - this.resourceTracker.registerNodeManager(request); - this.rmIdentifier = regNMResponse.getRMIdentifier(); - break; - } catch(Throwable e) { - LOG.warn("Trying to connect to ResourceManager, " + - "current no. of failed attempts is "+rmRetryCount); - if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS - || waitForEver) { - try { - LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 - + " seconds before next connection retry to RM"); - Thread.sleep(rmConnectionRetryIntervalMS); - } catch(InterruptedException ex) { - //done nothing - } - } else { - String errorMessage = "Failed to Connect to RM, " + - "no. of failed attempts is "+rmRetryCount; - LOG.error(errorMessage,e); - throw new YarnException(errorMessage,e); - } - } - } + RegisterNodeManagerResponse regNMResponse = + resourceTracker.registerNodeManager(request); + this.rmIdentifier = regNMResponse.getRMIdentifier(); // if the Resourcemanager instructs NM to shutdown. if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) { String message = @@ -428,8 +358,6 @@ public void run() { // Send heartbeat try { NodeHeartbeatResponse response = null; - int rmRetryCount = 0; - long waitStartTime = System.currentTimeMillis(); NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext(); nodeStatus.setResponseId(lastHeartBeatID); @@ -438,31 +366,8 @@ public void run() { request.setNodeStatus(nodeStatus); request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context .getContainerTokenSecretManager().getCurrentKey()); - while (!isStopped) { - try { - rmRetryCount++; - response = resourceTracker.nodeHeartbeat(request); - break; - } catch (Throwable e) { - LOG.warn("Trying to heartbeat to ResourceManager, " - + "current no. of failed attempts is " + rmRetryCount); - if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS - || waitForEver) { - try { - LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 - + " seconds before next heartbeat to RM"); - Thread.sleep(rmConnectionRetryIntervalMS); - } catch(InterruptedException ex) { - //done nothing - } - } else { - String errorMessage = "Failed to heartbeat to RM, " + - "no. of failed attempts is "+rmRetryCount; - LOG.error(errorMessage,e); - throw new YarnException(errorMessage,e); - } - } - } + + response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); // See if the master-key has rolled over @@ -512,11 +417,11 @@ public void run() { dispatcher.getEventHandler().handle( new CMgrCompletedAppsEvent(appsToCleanup)); } - } catch (YarnException e) { + } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); - throw e; + throw new YarnException(e); } catch (Throwable e) { // TODO Better error handling. Thread can die with the rest of the // NM still running. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java index 4c96d2d..7d4ec50 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java @@ -61,6 +61,10 @@ public MockNodeStatusUpdater(Context context, Dispatcher dispatcher, protected ResourceTracker getRMClient() { return resourceTracker; } + @Override + protected void stopRMProxy() { + return; + } private static class MockResourceTracker implements ResourceTracker { private int heartBeatID; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index 1e61d4e..d3d0508 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -103,6 +103,11 @@ protected ResourceTracker getRMClient() { return new LocalRMInterface(); }; + @Override + protected void stopRMProxy() { + return; + } + @Override protected void startStatusUpdater() { return; // Don't start any updating thread. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 6c2f8c9..0a250bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -40,6 +40,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.YarnException; @@ -51,13 +53,13 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -268,6 +270,11 @@ public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, protected ResourceTracker getRMClient() { return resourceTracker; } + + @Override + protected void stopRMProxy() { + return; + } } private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl { @@ -284,6 +291,10 @@ protected ResourceTracker getRMClient() { return resourceTracker; } + @Override + protected void stopRMProxy() { + return; + } } private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl { @@ -301,7 +312,12 @@ public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher, protected ResourceTracker getRMClient() { return resourceTracker; } - + + @Override + protected void stopRMProxy() { + return; + } + @Override protected boolean isTokenKeepAliveEnabled(Configuration conf) { return true; @@ -309,47 +325,56 @@ protected boolean isTokenKeepAliveEnabled(Configuration conf) { } private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl { - public ResourceTracker resourceTracker = - new MyResourceTracker(this.context); private Context context; - private final long waitStartTime; private final long rmStartIntervalMS; private final boolean rmNeverStart; + private Configuration conf; public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, - long rmStartIntervalMS, boolean rmNeverStart) { + Configuration conf, long rmStartIntervalMS, boolean rmNeverStart) { super(context, dispatcher, healthChecker, metrics); this.context = context; - this.waitStartTime = System.currentTimeMillis(); this.rmStartIntervalMS = rmStartIntervalMS; this.rmNeverStart = rmNeverStart; + this.conf = conf; } @Override protected ResourceTracker getRMClient() { - if(System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS - || rmNeverStart) { - throw new YarnException("Faking RM start failure as start " + - "delay timer has not expired."); - } else { - return resourceTracker; - } + RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); + return (ResourceTracker) RetryProxy.create(ResourceTracker.class, + new MyResourceTracker6(this.context, rmStartIntervalMS, rmNeverStart), + retryPolicy); + } + + @Override + protected void stopRMProxy() { + return; } } private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl { private ResourceTracker resourceTracker; + private Configuration conf; public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher, - NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf) { super(context, dispatcher, healthChecker, metrics); resourceTracker = new MyResourceTracker5(); + this.conf = conf; } @Override protected ResourceTracker getRMClient() { - return resourceTracker; + RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); + return (ResourceTracker) RetryProxy.create(ResourceTracker.class, + resourceTracker, retryPolicy); + } + + @Override + protected void stopRMProxy() { + return; } } @@ -373,15 +398,18 @@ protected MyNodeStatusUpdater3 getNodeStatusUpdater() { public boolean isStopped = false; private NodeStatusUpdater nodeStatusUpdater; private CyclicBarrier syncBarrier; - public MyNodeManager2 (CyclicBarrier syncBarrier) { + private Configuration conf; + + public MyNodeManager2 (CyclicBarrier syncBarrier, Configuration conf) { this.syncBarrier = syncBarrier; + this.conf = conf; } @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { nodeStatusUpdater = new MyNodeStatusUpdater5(context, dispatcher, healthChecker, - metrics); + metrics, conf); return nodeStatusUpdater; } @@ -533,7 +561,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) .get(4).getState() == ContainerState.RUNNING && request.getNodeStatus().getContainersStatuses().get(4) .getContainerId().getId() == 5); - throw new YarnException("Lost the heartbeat response"); + throw new java.net.ConnectException("Lost the heartbeat response"); } else if (heartBeatID == 2) { Assert.assertEquals(request.getNodeStatus().getContainersStatuses() .size(), 7); @@ -600,7 +628,62 @@ public RegisterNodeManagerResponse registerNodeManager( public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException, IOException { heartBeatID++; - throw RPCUtil.getRemoteException("NodeHeartbeat exception"); + throw new java.net.ConnectException( + "NodeHeartbeat exception"); + } + } + + private class MyResourceTracker6 implements ResourceTracker { + + private final Context context; + private long rmStartIntervalMS; + private boolean rmNeverStart; + private final long waitStartTime; + + public MyResourceTracker6(Context context, long rmStartIntervalMS, + boolean rmNeverStart) { + this.context = context; + this.rmStartIntervalMS = rmStartIntervalMS; + this.rmNeverStart = rmNeverStart; + this.waitStartTime = System.currentTimeMillis(); + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnRemoteException, + IOException { + if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS + || rmNeverStart) { + throw new java.net.ConnectException("Faking RM start failure as start " + + "delay timer has not expired."); + } else { + NodeId nodeId = request.getNodeId(); + Resource resource = request.getResource(); + LOG.info("Registering " + nodeId.toString()); + // NOTE: this really should be checking against the config value + InetSocketAddress expected = NetUtils.getConnectAddress( + conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1)); + Assert.assertEquals(NetUtils.getHostPortString(expected), + nodeId.toString()); + Assert.assertEquals(5 * 1024, resource.getMemory()); + registeredNodes.add(nodeId); + + RegisterNodeManagerResponse response = recordFactory + .newRecordInstance(RegisterNodeManagerResponse.class); + return response; + } + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnRemoteException { + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartBeatID++); + + NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. + newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null, + null, null, 1000L); + return nhResponse; } } @@ -774,7 +857,7 @@ public void testNMConnectionToRM() { final long connectionRetryIntervalSecs = 1; //Waiting for rmStartIntervalMS, RM will be started final long rmStartIntervalMS = 2*1000; - YarnConfiguration conf = createNMConfig(); + final YarnConfiguration conf = createNMConfig(); conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, connectionWaitSecs); conf.setLong(YarnConfiguration @@ -787,7 +870,7 @@ public void testNMConnectionToRM() { protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( - context, dispatcher, healthChecker, metrics, + context, dispatcher, healthChecker, metrics, conf, rmStartIntervalMS, true); return nodeStatusUpdater; } @@ -813,7 +896,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( - context, dispatcher, healthChecker, metrics, rmStartIntervalMS, + context, dispatcher, healthChecker, metrics, conf, rmStartIntervalMS, false); return nodeStatusUpdater; } @@ -953,7 +1036,7 @@ public void testNodeStatusUpdaterRetryAndNMShutdown() .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, connectionRetryIntervalSecs); CyclicBarrier syncBarrier = new CyclicBarrier(2); - nm = new MyNodeManager2(syncBarrier); + nm = new MyNodeManager2(syncBarrier, conf); nm.init(conf); nm.start(); try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 2e60784..6ff1bbc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -112,6 +112,11 @@ protected ResourceTracker getRMClient() { }; @Override + protected void stopRMProxy() { + return; + } + + @Override protected void startStatusUpdater() { return; // Don't start any updating thread. } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 979d1c3..7d090f2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -388,6 +388,11 @@ public RegisterNodeManagerResponse registerNodeManager( } }; }; + + @Override + protected void stopRMProxy() { + return; + } }; }; }