diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 9d0a56b468e..0ccf927a500 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.service.ServiceScheduler; +import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; @@ -151,10 +152,19 @@ public ComponentInstance(Component component, @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { // Query container status for ip and host + boolean cancelOnSuccess = true; + if (compInstance.getCompSpec().getArtifact() != null && compInstance + .getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) { + // A docker container might get a different IP if the container is + // relaunched by the NM, so we need to keep checking the status. + // This is a temporary fix until the NM provides a callback for + // container relaunch (see YARN-8265). + cancelOnSuccess = false; + } compInstance.containerStatusFuture = compInstance.scheduler.executorService.scheduleAtFixedRate( new ContainerStatusRetriever(compInstance.scheduler, - event.getContainerId(), compInstance), 0, 1, + event.getContainerId(), compInstance, cancelOnSuccess), 0, 1, TimeUnit.SECONDS); long containerStartTime = System.currentTimeMillis(); try { @@ -373,14 +383,26 @@ public void updateContainerStatus(ContainerStatus status) { this.status = status; org.apache.hadoop.yarn.service.api.records.Container container = getCompSpec().getContainer(status.getContainerId().toString()); + boolean doRegistryUpdate = true; if (container != null) { - container.setIp(StringUtils.join(",", status.getIPs())); + String existingIP = container.getIp(); + String newIP = StringUtils.join(",", status.getIPs()); + container.setIp(newIP); container.setHostname(status.getHost()); - if (timelineServiceEnabled) { + if (existingIP != null && newIP.equals(existingIP)) { + doRegistryUpdate = false; + } + if (timelineServiceEnabled && doRegistryUpdate) { serviceTimelinePublisher.componentInstanceIPHostUpdated(container); } } - updateServiceRecord(yarnRegistryOperations, status); + if (doRegistryUpdate) { + cleanupRegistry(status.getContainerId()); + LOG.info( + getCompInstanceId() + " new IP = " + status.getIPs() + ", host = " + + status.getHost() + ", updating registry"); + updateServiceRecord(yarnRegistryOperations, status); + } } public String getCompName() { @@ -522,12 +544,15 @@ public void cleanupRegistryAndCompHdfsDir(ContainerId containerId) { private NodeId nodeId; private NMClient nmClient; private ComponentInstance instance; + private boolean cancelOnSuccess; ContainerStatusRetriever(ServiceScheduler scheduler, - ContainerId containerId, ComponentInstance instance) { + ContainerId containerId, ComponentInstance instance, boolean + cancelOnSuccess) { this.containerId = containerId; this.nodeId = instance.getNodeId(); this.nmClient = scheduler.getNmClient().getClient(); this.instance = instance; + this.cancelOnSuccess = cancelOnSuccess; } @Override public void run() { ContainerStatus status = null; @@ -548,10 +573,12 @@ public void cleanupRegistryAndCompHdfsDir(ContainerId containerId) { return; } instance.updateContainerStatus(status); - LOG.info( - instance.compInstanceId + " IP = " + status.getIPs() + ", host = " - + status.getHost() + ", cancel container status retriever"); - instance.containerStatusFuture.cancel(false); + if (cancelOnSuccess) { + LOG.info( + instance.compInstanceId + " IP = " + status.getIPs() + ", host = " + + status.getHost() + ", cancel container status retriever"); + instance.containerStatusFuture.cancel(false); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java index 04b03477b63..4a75aefe058 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java @@ -317,6 +317,14 @@ public void feedFailedContainerToComp(Service service, int id, String } } + public Container updateContainerStatus(Service service, int id, + String compName, String host) { + ContainerId containerId = createContainerId(id); + Container container = createContainer(containerId, compName); + addContainerStatus(container, ContainerState.RUNNING, host); + return container; + } + public ContainerId createContainerId(int id) { ApplicationId applicationId = ApplicationId.fromString(service.getId()); return ContainerId.newContainerId( @@ -389,10 +397,15 @@ public Boolean get() { } private void addContainerStatus(Container container, ContainerState state) { + addContainerStatus(container, state, container.getNodeId().getHost()); + } + + private void addContainerStatus(Container container, ContainerState state, + String host) { ContainerStatus status = ContainerStatus.newInstance(container.getId(), state, "", 0); - status.setHost(container.getNodeId().getHost()); - status.setIPs(Lists.newArrayList(container.getNodeId().getHost())); + status.setHost(host); + status.setIPs(Lists.newArrayList(host)); containerStatuses.put(container.getId(), status); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java index 260976aabdd..e9478f0b7e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.DockerCredentialTokenIdentifier; +import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.api.records.Service; @@ -349,4 +350,45 @@ public void testRecordTokensForContainers() throws Exception { am.stop(); } + + @Test + public void testIPChange() throws TimeoutException, + InterruptedException { + ApplicationId applicationId = ApplicationId.newInstance(123456, 1); + String comp1Name = "comp1"; + String comp1InstName = "comp1-0"; + Service exampleApp = new Service(); + exampleApp.setId(applicationId.toString()); + exampleApp.setVersion("v1"); + exampleApp.setName("testIPChange"); + Component comp1 = createComponent(comp1Name, 1, "sleep 60"); + comp1.setArtifact(new Artifact().type(Artifact.TypeEnum.DOCKER)); + exampleApp.addComponent(comp1); + + MockServiceAM am = new MockServiceAM(exampleApp); + am.init(conf); + am.start(); + + ComponentInstance comp1inst0 = am.getCompInstance(comp1Name, comp1InstName); + // allocate a container + am.feedContainerToComp(exampleApp, 1, comp1Name); + GenericTestUtils.waitFor(() -> comp1inst0.getContainerStatus() != null, + 2000, 200000); + // first host status will match the container nodeId + Assert.assertEquals("localhost", + comp1inst0.getContainerStatus().getHost()); + + LOG.info("Change the IP and host"); + // change the container status + am.updateContainerStatus(exampleApp, 1, comp1Name, "new.host"); + GenericTestUtils.waitFor(() -> comp1inst0.getContainerStatus().getHost() + .equals("new.host"), 2000, 200000); + + LOG.info("Change the IP and host again"); + // change the container status + am.updateContainerStatus(exampleApp, 1, comp1Name, "newer.host"); + GenericTestUtils.waitFor(() -> comp1inst0.getContainerStatus().getHost() + .equals("newer.host"), 2000, 200000); + am.stop(); + } }