diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index dd1060c..0107f8a 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -54,6 +54,8 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -71,6 +73,7 @@ import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; @@ -438,5 +441,12 @@ public StopContainersResponse stopContainers(StopContainersRequest request) "Dummy function cause")); throw new IOException(e); } + + @Override + public AuxServiceCheckResponse + checkAuxServiceCheck(AuxServiceCheckRequest request) + throws YarnException, IOException { + return null; + } } } \ No newline at end of file diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index 6f21c87..486321a 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -45,6 +45,8 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; @@ -443,6 +445,12 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws IOException { return null; } + @Override + public AuxServiceCheckResponse + checkAuxServiceCheck(AuxServiceCheckRequest request) + throws YarnException, IOException { + return null; + } } @SuppressWarnings("serial") diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java index 7aa43df..9edeecb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManagementProtocol.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -170,4 +172,10 @@ StopContainersResponse stopContainers(StopContainersRequest request) GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException, IOException; + + + @Public + @Stable + AuxServiceCheckResponse checkAuxServiceCheck(AuxServiceCheckRequest request) + throws YarnException, IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AuxServiceCheckRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AuxServiceCheckRequest.java new file mode 100644 index 0000000..d3304e8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AuxServiceCheckRequest.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Stable +public abstract class AuxServiceCheckRequest { + public static AuxServiceCheckRequest newInstance(ApplicationId appId) { + AuxServiceCheckRequest request = + Records.newRecord(AuxServiceCheckRequest.class); + request.setApplicationId(appId); + return request; + } + + public abstract ApplicationId getApplicationId(); + public abstract void setApplicationId(ApplicationId appId); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AuxServiceCheckResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AuxServiceCheckResponse.java new file mode 100644 index 0000000..47cd4f2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AuxServiceCheckResponse.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.List; + +import org.apache.hadoop.yarn.util.Records; + + +public abstract class AuxServiceCheckResponse { + + public static AuxServiceCheckResponse newInstance(List auxServiceMessage) { + AuxServiceCheckResponse response = + Records.newRecord(AuxServiceCheckResponse.class); + response.setAuxServiceMessage(auxServiceMessage); + return response; + } + + public abstract List getAuxServiceMessage(); + + public abstract void setAuxServiceMessage(List auxServiceMessage); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto index 98f438a..632fbc2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/containermanagement_protocol.proto @@ -33,4 +33,5 @@ service ContainerManagementProtocolService { rpc startContainers(StartContainersRequestProto) returns (StartContainersResponseProto); rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto); rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto); + rpc checkAuxServiceCheck(AuxServiceCheckRequestProto) returns (AuxServiceCheckResponseProto); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index bd009e0..a6233b1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -216,3 +216,11 @@ message GetContainerStatusesResponseProto { repeated ContainerStatusProto status = 1; repeated ContainerExceptionMapProto failed_requests = 2; } + +message AuxServiceCheckRequestProto { + optional ApplicationIdProto application_id = 1; +} + +message AuxServiceCheckResponseProto { + repeated string aux_service_messages = 1; +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 8914646..818c6e6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -68,9 +68,13 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.AuxServiceChecker; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AuxServiceCheckerImpl; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; +import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -447,8 +451,18 @@ public boolean run() throws YarnException, IOException { resourceManager.init(conf); resourceManager.start(); + AuxServicerCheckCallbackHandler handler = + new AuxServicerCheckCallbackHandler(); + AuxServiceChecker auxServiceChecker = + new AuxServiceCheckerImpl(appAttemptID.getApplicationId(), handler); + auxServiceChecker.init(conf); + auxServiceChecker.start(); + + NMClient nmClient = new NMClientImpl(); + nmClient.registerAuxServiceChecker(auxServiceChecker); + containerListener = new NMCallbackHandler(); - nmClientAsync = new NMClientAsyncImpl(containerListener); + nmClientAsync = new NMClientAsyncImpl(nmClient, containerListener); nmClientAsync.init(conf); nmClientAsync.start(); @@ -712,6 +726,14 @@ public void onStopContainerError(ContainerId containerId, Throwable t) { } } + private class AuxServicerCheckCallbackHandler implements + AuxServiceChecker.CallbackHandler { + + @Override + public void auxServiceFailure(List message) { + done = true; + } + } /** * Thread to connect to the {@link ContainerManagementProtocol} and launch the container * that will execute the shell command. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java index 57e7db5..b4a5bef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.api.async.AuxServiceChecker; import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -118,4 +119,6 @@ public abstract ContainerStatus getContainerStatus(ContainerId containerId, * @param enabled whether the feature is enabled or not */ public abstract void cleanupRunningContainersOnStop(boolean enabled); + + public abstract void registerAuxServiceChecker(AuxServiceChecker auxServiceChecker); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AuxServiceChecker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AuxServiceChecker.java new file mode 100644 index 0000000..97da49c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AuxServiceChecker.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.client.api.async; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.async.impl.AuxServiceCheckerImpl; + +import com.google.common.annotations.VisibleForTesting; + + +public abstract class AuxServiceChecker extends AbstractService{ + + protected Map containerManagerProxies; + protected CallbackHandler callbackHandler; + protected ApplicationId appId; + + @Private + @VisibleForTesting + protected AuxServiceChecker(String name, ApplicationId appId, + CallbackHandler callbackHandler) { + super(name); + this.appId = appId; + this.callbackHandler = callbackHandler; + } + + public static AuxServiceChecker createNMClientAsync( + ApplicationId appId, CallbackHandler callbackHandler) { + return new AuxServiceCheckerImpl(appId, callbackHandler); + } + + public void addContainerManagerProxy(String containerManagerBindAddress, + ContainerManagementProtocol protocol) { + containerManagerProxies.put(containerManagerBindAddress, protocol); + } + + public void removeContainerManagerProxy(String containerManagerBindAddress) { + containerManagerProxies.remove(containerManagerBindAddress); + } + + public boolean checkContainerManagerProxy(String containerManagerBindAddress) { + return containerManagerProxies.containsKey(containerManagerBindAddress); + } + + public static interface CallbackHandler { + void auxServiceFailure(List message); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AuxServiceCheckerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AuxServiceCheckerImpl.java new file mode 100644 index 0000000..18c35e8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AuxServiceCheckerImpl.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.client.api.async.impl; + +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.async.AuxServiceChecker; +import com.google.common.annotations.VisibleForTesting; + +public class AuxServiceCheckerImpl extends AuxServiceChecker { + + private static final Log LOG = LogFactory.getLog(AuxServiceCheckerImpl.class); + + private AuxServiceCheckThread checkThread; + + private volatile boolean keepRunning; + + public AuxServiceCheckerImpl(ApplicationId appId, + CallbackHandler callbackHandler) { + this(AuxServiceChecker.class.getName(), appId, callbackHandler); + } + + @Private + @VisibleForTesting + protected AuxServiceCheckerImpl(String name, ApplicationId appId, + CallbackHandler callbackHandler) { + super(name, appId, callbackHandler); + this.appId = appId; + this.callbackHandler = callbackHandler; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.containerManagerProxies = + new ConcurrentHashMap(); + this.checkThread = new AuxServiceCheckThread(); + keepRunning = true; + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + checkThread.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + keepRunning = false; + checkThread.interrupt(); + try { + checkThread.join(); + } catch (InterruptedException ex) { + LOG.error("Error joining with heartbeat thread", ex); + } + checkThread.interrupt(); + super.serviceStop(); + } + + private class AuxServiceCheckThread extends Thread { + + public void run() { + while (keepRunning) { + for (ContainerManagementProtocol containerManager : containerManagerProxies + .values()) { + AuxServiceCheckRequest request = + AuxServiceCheckRequest.newInstance(appId); + try { + AuxServiceCheckResponse response = + containerManager.checkAuxServiceCheck(request); + if (response.getAuxServiceMessage() != null + && !response.getAuxServiceMessage().isEmpty()) { + callbackHandler + .auxServiceFailure(response.getAuxServiceMessage()); + } + } catch (Throwable ex) { + // ignore + } + } + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // ignore + } + } + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java index 700a509..bf263fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java @@ -90,6 +90,9 @@ public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) { this(name, new NMClientImpl(), callbackHandler); } + public NMClientAsyncImpl(NMClient nmclient, CallbackHandler callbackHandler) { + this(NMClientAsync.class.getName(), nmclient, callbackHandler); + } @Private @VisibleForTesting protected NMClientAsyncImpl(String name, NMClient client, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java index b5f0be1..9591821 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AuxServiceChecker; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; @@ -90,6 +91,7 @@ //enabled by default private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true); private ContainerManagementProtocolProxy cmProxy; + private AuxServiceChecker auxServiceChecker; public NMClientImpl() { super(NMClientImpl.class.getName()); @@ -185,8 +187,9 @@ private void addStartingContainer(StartedContainer startedContainer) Map allServiceResponse; ContainerManagementProtocolProxyData proxy = null; try { + String containerManagerAddr = container.getNodeId().toString(); proxy = - cmProxy.getProxy(container.getNodeId().toString(), + cmProxy.getProxy(containerManagerAddr, container.getId()); StartContainerRequest scRequest = StartContainerRequest.newInstance(containerLaunchContext, @@ -206,6 +209,10 @@ private void addStartingContainer(StartedContainer startedContainer) } allServiceResponse = response.getAllServicesMetaData(); startingContainer.state = ContainerState.RUNNING; + if(!auxServiceChecker.checkContainerManagerProxy(containerManagerAddr)) { + auxServiceChecker.addContainerManagerProxy(containerManagerAddr, + proxy.getContainerManagementProtocol()); + } } catch (YarnException e) { startingContainer.state = ContainerState.COMPLETE; // Remove the started container if it failed to start @@ -333,4 +340,9 @@ private void parseAndThrowException(Throwable t) throws YarnException, throw (IOException) t; } } + + @Override + public void registerAuxServiceChecker(AuxServiceChecker auxServiceChecker) { + this.auxServiceChecker = auxServiceChecker; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 76e87f5..e6666d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -18,6 +18,10 @@ package org.apache.hadoop.yarn.client.api.impl; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Matchers.anyString; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -59,6 +63,7 @@ import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.async.AuxServiceChecker; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -166,7 +171,11 @@ public void setup() throws YarnException, IOException { // start am nm client nmClient = (NMClientImpl) NMClient.createNMClient(); + AuxServiceChecker mockAuxServiceChecker = mock(AuxServiceChecker.class); + when(mockAuxServiceChecker.checkContainerManagerProxy(anyString())) + .thenReturn(true); nmClient.init(conf); + nmClient.registerAuxServiceChecker(mockAuxServiceChecker); nmClient.start(); assertNotNull(nmClient); assertEquals(STATE.STARTED, nmClient.getServiceState()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java index 15397e3..6f86f08 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagementProtocolPBClientImpl.java @@ -30,12 +30,16 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AuxServiceCheckRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AuxServiceCheckResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; @@ -45,6 +49,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.AuxServiceCheckRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto; @@ -128,4 +133,19 @@ public GetContainerStatusesResponse getContainerStatuses( return null; } } + + @Override + public AuxServiceCheckResponse checkAuxServiceCheck( + AuxServiceCheckRequest request) + throws YarnException, IOException { + AuxServiceCheckRequestProto requestProto = + ((AuxServiceCheckRequestPBImpl) request).getProto(); + try { + return new AuxServiceCheckResponsePBImpl(proxy.checkAuxServiceCheck(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java index 2d33e69..3fe6882 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ContainerManagementProtocolPBServiceImpl.java @@ -23,9 +23,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AuxServiceCheckRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AuxServiceCheckResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; @@ -33,6 +36,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.AuxServiceCheckRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.AuxServiceCheckResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; @@ -94,4 +99,19 @@ public GetContainerStatusesResponseProto getContainerStatuses( throw new ServiceException(e); } } + + @Override + public AuxServiceCheckResponseProto checkAuxServiceCheck( + RpcController arg0, AuxServiceCheckRequestProto proto) + throws ServiceException { + AuxServiceCheckRequestPBImpl request = new AuxServiceCheckRequestPBImpl(proto); + try { + AuxServiceCheckResponse response = real.checkAuxServiceCheck(request); + return ((AuxServiceCheckResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AuxServiceCheckRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AuxServiceCheckRequestPBImpl.java new file mode 100644 index 0000000..88a3c4a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AuxServiceCheckRequestPBImpl.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.AuxServiceCheckRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.AuxServiceCheckRequestProtoOrBuilder; + +public class AuxServiceCheckRequestPBImpl extends AuxServiceCheckRequest { + + AuxServiceCheckRequestProto proto = AuxServiceCheckRequestProto + .getDefaultInstance(); + AuxServiceCheckRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + + public AuxServiceCheckRequestPBImpl() { + builder = AuxServiceCheckRequestProto.newBuilder(); + } + + public AuxServiceCheckRequestPBImpl(AuxServiceCheckRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public AuxServiceCheckRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return getProto().toString().replaceAll("\\n", ", ") + .replaceAll("\\s+", " "); + } + + private void mergeLocalToBuilder() { + if (this.applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = AuxServiceCheckRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public ApplicationId getApplicationId() { + AuxServiceCheckRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } + if (!p.hasApplicationId()) { + return null; + } + this.applicationId = convertFromProtoFormat(p.getApplicationId()); + return this.applicationId; + } + + @Override + public void setApplicationId(ApplicationId applicationId) { + maybeInitBuilder(); + if (applicationId == null) + builder.clearApplicationId(); + this.applicationId = applicationId; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AuxServiceCheckResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AuxServiceCheckResponsePBImpl.java new file mode 100644 index 0000000..108f64c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AuxServiceCheckResponsePBImpl.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.AuxServiceCheckResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.AuxServiceCheckResponseProtoOrBuilder; + +public class AuxServiceCheckResponsePBImpl extends AuxServiceCheckResponse { + + AuxServiceCheckResponseProto proto = AuxServiceCheckResponseProto + .getDefaultInstance(); + AuxServiceCheckResponseProto.Builder builder = null; + boolean viaProto = false; + + List messages = null; + + public AuxServiceCheckResponsePBImpl() { + builder = AuxServiceCheckResponseProto.newBuilder(); + } + + public AuxServiceCheckResponsePBImpl(AuxServiceCheckResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public AuxServiceCheckResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return getProto().toString().replaceAll("\\n", ", ") + .replaceAll("\\s+", " "); + } + + private void mergeLocalToBuilder() { + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = AuxServiceCheckResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public List getAuxServiceMessage() { + initAuxServiceCheckMessage(); + return this.messages; + } + + @Override + public void setAuxServiceMessage(List auxServiceMessage) { + maybeInitBuilder(); + if (auxServiceMessage == null) + builder.clearAuxServiceMessages(); + this.messages = auxServiceMessage; + } + + private void initAuxServiceCheckMessage() { + if (this.messages != null) { + return; + } + AuxServiceCheckResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getAuxServiceMessagesList(); + this.messages = new ArrayList(); + this.messages.addAll(list); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index 8fe5c3c..16b818b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -33,6 +33,8 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -165,5 +167,12 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesResponse.newInstance(list, null); return null; } + + @Override + public AuxServiceCheckResponse + checkAuxServiceCheck(AuxServiceCheckRequest request) + throws YarnException, IOException { + return null; + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index 76384d3..2b5ee36 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -218,6 +220,13 @@ public StopContainersResponse stopContainers(StopContainersRequest request) new Exception(EXCEPTION_CAUSE)); throw new YarnException(e); } + + @Override + public AuxServiceCheckResponse + checkAuxServiceCheck(AuxServiceCheckRequest request) + throws YarnException, IOException { + return null; + } } public static ContainerTokenIdentifier newContainerTokenIdentifier( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index 955ccbf..bcbb3ac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -45,14 +45,16 @@ protected final Map serviceMap; protected final Map serviceMetaData; + private final ContainerManagerImpl containerManager; - public AuxServices() { + public AuxServices(ContainerManagerImpl containerManager) { super(AuxServices.class.getName()); serviceMap = Collections.synchronizedMap(new HashMap()); serviceMetaData = Collections.synchronizedMap(new HashMap()); // Obtain services from configuration in init() + this.containerManager = containerManager; } protected final synchronized void addService(String name, @@ -169,13 +171,23 @@ public void handle(AuxServicesEvent event) { // TODO kill all containers waiting on Application return; } - service.initializeApplication(new ApplicationInitializationContext(event - .getUser(), event.getApplicationID(), event.getServiceData())); + try { + service.initializeApplication(new ApplicationInitializationContext(event + .getUser(), event.getApplicationID(), event.getServiceData())); + } catch (Exception ex) { + containerManager.addToAuxiliaryServicesChecker( + event.getApplicationID(), ex.getMessage()); + } break; case APPLICATION_STOP: for (AuxiliaryService serv : serviceMap.values()) { - serv.stopApplication(new ApplicationTerminationContext(event - .getApplicationID())); + try { + serv.stopApplication(new ApplicationTerminationContext(event + .getApplicationID())); + } catch (Exception ex) { + containerManager.addToAuxiliaryServicesChecker( + event.getApplicationID(), ex.getMessage()); + } } break; default: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index f8a5ea2..e71c24c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -51,6 +52,8 @@ import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -122,6 +125,9 @@ private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class); + private final Map> auxiliaryServicesChecker = + new ConcurrentHashMap>(); + final Context context; private final ContainersMonitor containersMonitor; private Server server; @@ -163,7 +169,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, this.aclsManager = aclsManager; // Start configurable services - auxiliaryServices = new AuxServices(); + auxiliaryServices = new AuxServices(this); auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); @@ -767,4 +773,31 @@ public Context getContext() { return this.context; } + protected void addToAuxiliaryServicesChecker(ApplicationId appId, + String diagnostic) { + if (!auxiliaryServicesChecker.containsKey(appId)) { + ArrayList newDiagnostics = new ArrayList(); + newDiagnostics.add(diagnostic); + auxiliaryServicesChecker.put(appId, newDiagnostics); + } else { + ArrayList diagnostics = new ArrayList(); + diagnostics.add(diagnostic); + auxiliaryServicesChecker.put(appId, diagnostics); + } + } + + protected void removeFromAuxiliaryServicesChecker(ApplicationId appId) { + auxiliaryServicesChecker.remove(appId); + } + + @Override + public AuxServiceCheckResponse checkAuxServiceCheck( + AuxServiceCheckRequest request) + throws YarnException, IOException { + ArrayList auxServiceMessage = + auxiliaryServicesChecker.get(request.getApplicationId()); + AuxServiceCheckResponse response = + AuxServiceCheckResponse.newInstance(auxServiceMessage); + return response; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index fb4b69a..856f41e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -119,7 +119,7 @@ public void testAuxEventDispatch() { ServiceB.class, Service.class); conf.setInt("A.expected.init", 1); conf.setInt("B.expected.stop", 1); - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(null); aux.init(conf); aux.start(); @@ -152,7 +152,7 @@ public void testAuxServices() { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(null); aux.init(conf); int latch = 1; @@ -183,7 +183,7 @@ public void testAuxServicesMeta() { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(null); aux.init(conf); int latch = 1; @@ -220,7 +220,7 @@ public void testAuxUnexpectedStop() { ServiceA.class, Service.class); conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); - final AuxServices aux = new AuxServices(); + final AuxServices aux = new AuxServices(null); aux.init(conf); aux.start(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index 2c9d678..c96e675 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -31,6 +31,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -313,4 +315,10 @@ synchronized public GetContainerStatusesResponse getContainerStatuses( nodeStatus.setNodeHealthStatus(nodeHealthStatus); return nodeStatus; } + + @Override + public AuxServiceCheckResponse checkAuxServiceCheck(AuxServiceCheckRequest request) + throws YarnException, IOException { + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java index a9f1c1a..462cb60 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java @@ -39,6 +39,8 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; @@ -129,6 +131,13 @@ public Credentials getContainerCredentials() throws IOException { credentials.readTokenStorageStream(buf); return credentials; } + + @Override + public AuxServiceCheckResponse + checkAuxServiceCheck(AuxServiceCheckRequest request) + throws YarnException, IOException { + return null; + } } public static class MockRMWithAMS extends MockRMWithCustomAMLauncher { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 64e5cc9..572da8e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AuxServiceCheckResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -118,6 +120,13 @@ public GetContainerStatusesResponse getContainerStatuses( GetContainerStatusesRequest request) throws YarnException { return null; } + + @Override + public AuxServiceCheckResponse + checkAuxServiceCheck(AuxServiceCheckRequest request) + throws YarnException, IOException { + return null; + } } @Test