diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 9c6ff3345f0..d586813eb67 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -170,6 +170,7 @@ private long finishTime = 0; private long launchAMStartTime = 0; private long launchAMEndTime = 0; + private boolean nonWorkPreservingAMContainerFinished = false; // Set to null initially. Will eventually get set // if an RMAppAttemptUnregistrationEvent occurs @@ -844,7 +845,7 @@ public float getProgress() { // A new allocate means the AM received the previously sent // finishedContainers. We can ack this to NM now - sendFinishedContainersToNM(); + sendFinishedContainersToNM(finishedContainersSentToAM); // Mark every containerStatus as being sent to AM though we may return // only the ones that belong to the current attempt @@ -1955,13 +1956,14 @@ private void sendFinishedAMContainerToNM(NodeId nodeId, } // Ack NM to remove finished containers from context. - private void sendFinishedContainersToNM() { - for (NodeId nodeId : finishedContainersSentToAM.keySet()) { + private void sendFinishedContainersToNM( + Map> finishedContainers) { + for (NodeId nodeId : finishedContainers.keySet()) { // Clear and get current values List currentSentContainers = - finishedContainersSentToAM.put(nodeId, - new ArrayList()); + finishedContainers.put(nodeId, + new ArrayList()); List containerIdList = new ArrayList<>(currentSentContainers.size()); for (ContainerStatus containerStatus : currentSentContainers) { @@ -1970,7 +1972,7 @@ private void sendFinishedContainersToNM() { eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId, containerIdList)); } - this.finishedContainersSentToAM.clear(); + finishedContainers.clear(); } // Add am container to the list so that am container instance will be @@ -1996,7 +1998,16 @@ private static void amContainerFinished(RMAppAttemptImpl appAttempt, appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList()); appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus); - appAttempt.sendFinishedContainersToNM(); + appAttempt.sendFinishedContainersToNM( + appAttempt.finishedContainersSentToAM); + // there might be some completed containers that are have not been pulled + // by the AM heartbeat, explicitly add them for cleanup. + appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers); + + // mark the fact that AM container has finished so that future finished + // containers will be cleaned up without the engagement of AM containers + // (through heartbeat) + appAttempt.nonWorkPreservingAMContainerFinished = true; } else { appAttempt.sendFinishedAMContainerToNM(nodeId, containerStatus.getContainerId()); @@ -2024,6 +2035,11 @@ private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt, .getNodeId(), new ArrayList()); appAttempt.justFinishedContainers.get(containerFinishedEvent .getNodeId()).add(containerFinishedEvent.getContainerStatus()); + + if (appAttempt.nonWorkPreservingAMContainerFinished) { + // AM container has finished, so no more AM heartbeats to do the cleanup. + appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers); + } } private static final class ContainerFinishedAtFinalStateTransition diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/placement/schema/MappingRulesDescription.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/placement/schema/MappingRulesDescription.java new file mode 100644 index 00000000000..eb9c4f67068 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/placement/schema/MappingRulesDescription.java @@ -0,0 +1,86 @@ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonPropertyOrder({ + "rules" +}) +public class MappingRulesDescription { + + @JsonProperty("rules") + private List rules = new ArrayList(); + @JsonIgnore + private Map additionalProperties = new HashMap(); + + @JsonProperty("rules") + public List getRules() { + return rules; + } + + @JsonProperty("rules") + public void setRules(List rules) { + this.rules = rules; + } + + @JsonAnyGetter + public Map getAdditionalProperties() { + return this.additionalProperties; + } + + @JsonAnySetter + public void setAdditionalProperty(String name, Object value) { + this.additionalProperties.put(name, value); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(MappingRulesDescription.class.getName()).append('@').append(Integer.toHexString(System.identityHashCode(this))).append('['); + sb.append("rules"); + sb.append('='); + sb.append(((this.rules == null)?"":this.rules)); + sb.append(','); + sb.append("additionalProperties"); + sb.append('='); + sb.append(((this.additionalProperties == null)?"":this.additionalProperties)); + sb.append(','); + if (sb.charAt((sb.length()- 1)) == ',') { + sb.setCharAt((sb.length()- 1), ']'); + } else { + sb.append(']'); + } + return sb.toString(); + } + + @Override + public int hashCode() { + int result = 1; + result = ((result* 31)+((this.rules == null)? 0 :this.rules.hashCode())); + result = ((result* 31)+((this.additionalProperties == null)? 0 :this.additionalProperties.hashCode())); + return result; + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if ((other instanceof MappingRulesDescription) == false) { + return false; + } + MappingRulesDescription rhs = ((MappingRulesDescription) other); + return (((this.rules == rhs.rules)||((this.rules!= null)&&this.rules.equals(rhs.rules)))&&((this.additionalProperties == rhs.additionalProperties)||((this.additionalProperties!= null)&&this.additionalProperties.equals(rhs.additionalProperties)))); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/placement/schema/Rule.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/placement/schema/Rule.java new file mode 100644 index 00000000000..4e0ede355e4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/placement/schema/Rule.java @@ -0,0 +1,340 @@ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema; + +import java.util.HashMap; +import java.util.Map; +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.annotation.JsonValue; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonPropertyOrder({ + "type", + "matches", + "policy", + "parentQueue", + "fallbackResult", + "create", + "value", + "customPlacement" +}) +public class Rule { + + @JsonProperty("type") + private Rule.Type type; + @JsonProperty("matches") + private String matches; + @JsonProperty("policy") + private Rule.Policy policy; + @JsonProperty("parentQueue") + private String parentQueue; + @JsonProperty("fallbackResult") + private Rule.FallbackResult fallbackResult; + @JsonProperty("create") + private Boolean create; + @JsonProperty("value") + private String value; + @JsonProperty("customPlacement") + private String customPlacement; + @JsonIgnore + private Map additionalProperties = new HashMap(); + + @JsonProperty("type") + public Rule.Type getType() { + return type; + } + + @JsonProperty("type") + public void setType(Rule.Type type) { + this.type = type; + } + + @JsonProperty("matches") + public String getMatches() { + return matches; + } + + @JsonProperty("matches") + public void setMatches(String matches) { + this.matches = matches; + } + + @JsonProperty("policy") + public Rule.Policy getPolicy() { + return policy; + } + + @JsonProperty("policy") + public void setPolicy(Rule.Policy policy) { + this.policy = policy; + } + + @JsonProperty("parentQueue") + public String getParentQueue() { + return parentQueue; + } + + @JsonProperty("parentQueue") + public void setParentQueue(String parentQueue) { + this.parentQueue = parentQueue; + } + + @JsonProperty("fallbackResult") + public Rule.FallbackResult getFallbackResult() { + return fallbackResult; + } + + @JsonProperty("fallbackResult") + public void setFallbackResult(Rule.FallbackResult fallbackResult) { + this.fallbackResult = fallbackResult; + } + + @JsonProperty("create") + public Boolean getCreate() { + return create; + } + + @JsonProperty("create") + public void setCreate(Boolean create) { + this.create = create; + } + + @JsonProperty("value") + public String getValue() { + return value; + } + + @JsonProperty("value") + public void setValue(String value) { + this.value = value; + } + + @JsonProperty("customPlacement") + public String getCustomPlacement() { + return customPlacement; + } + + @JsonProperty("customPlacement") + public void setCustomPlacement(String customPlacement) { + this.customPlacement = customPlacement; + } + + @JsonAnyGetter + public Map getAdditionalProperties() { + return this.additionalProperties; + } + + @JsonAnySetter + public void setAdditionalProperty(String name, Object value) { + this.additionalProperties.put(name, value); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(Rule.class.getName()).append('@').append(Integer.toHexString(System.identityHashCode(this))).append('['); + sb.append("type"); + sb.append('='); + sb.append(((this.type == null)?"":this.type)); + sb.append(','); + sb.append("matches"); + sb.append('='); + sb.append(((this.matches == null)?"":this.matches)); + sb.append(','); + sb.append("policy"); + sb.append('='); + sb.append(((this.policy == null)?"":this.policy)); + sb.append(','); + sb.append("parentQueue"); + sb.append('='); + sb.append(((this.parentQueue == null)?"":this.parentQueue)); + sb.append(','); + sb.append("fallbackResult"); + sb.append('='); + sb.append(((this.fallbackResult == null)?"":this.fallbackResult)); + sb.append(','); + sb.append("create"); + sb.append('='); + sb.append(((this.create == null)?"":this.create)); + sb.append(','); + sb.append("value"); + sb.append('='); + sb.append(((this.value == null)?"":this.value)); + sb.append(','); + sb.append("customPlacement"); + sb.append('='); + sb.append(((this.customPlacement == null)?"":this.customPlacement)); + sb.append(','); + sb.append("additionalProperties"); + sb.append('='); + sb.append(((this.additionalProperties == null)?"":this.additionalProperties)); + sb.append(','); + if (sb.charAt((sb.length()- 1)) == ',') { + sb.setCharAt((sb.length()- 1), ']'); + } else { + sb.append(']'); + } + return sb.toString(); + } + + @Override + public int hashCode() { + int result = 1; + result = ((result* 31)+((this.fallbackResult == null)? 0 :this.fallbackResult.hashCode())); + result = ((result* 31)+((this.customPlacement == null)? 0 :this.customPlacement.hashCode())); + result = ((result* 31)+((this.create == null)? 0 :this.create.hashCode())); + result = ((result* 31)+((this.parentQueue == null)? 0 :this.parentQueue.hashCode())); + result = ((result* 31)+((this.additionalProperties == null)? 0 :this.additionalProperties.hashCode())); + result = ((result* 31)+((this.type == null)? 0 :this.type.hashCode())); + result = ((result* 31)+((this.matches == null)? 0 :this.matches.hashCode())); + result = ((result* 31)+((this.value == null)? 0 :this.value.hashCode())); + result = ((result* 31)+((this.policy == null)? 0 :this.policy.hashCode())); + return result; + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if ((other instanceof Rule) == false) { + return false; + } + Rule rhs = ((Rule) other); + return ((((((((((this.fallbackResult == rhs.fallbackResult)||((this.fallbackResult!= null)&&this.fallbackResult.equals(rhs.fallbackResult)))&&((this.customPlacement == rhs.customPlacement)||((this.customPlacement!= null)&&this.customPlacement.equals(rhs.customPlacement))))&&((this.create == rhs.create)||((this.create!= null)&&this.create.equals(rhs.create))))&&((this.parentQueue == rhs.parentQueue)||((this.parentQueue!= null)&&this.parentQueue.equals(rhs.parentQueue))))&&((this.additionalProperties == rhs.additionalProperties)||((this.additionalProperties!= null)&&this.additionalProperties.equals(rhs.additionalProperties))))&&((this.type == rhs.type)||((this.type!= null)&&this.type.equals(rhs.type))))&&((this.matches == rhs.matches)||((this.matches!= null)&&this.matches.equals(rhs.matches))))&&((this.value == rhs.value)||((this.value!= null)&&this.value.equals(rhs.value))))&&((this.policy == rhs.policy)||((this.policy!= null)&&this.policy.equals(rhs.policy)))); + } + + public enum FallbackResult { + + SKIP("skip"), + REJECT("reject"), + PLACE_DEFAULT("placeDefault"); + private final String value; + private final static Map CONSTANTS = new HashMap(); + + static { + for (Rule.FallbackResult c: values()) { + CONSTANTS.put(c.value, c); + } + } + + private FallbackResult(String value) { + this.value = value; + } + + @Override + public String toString() { + return this.value; + } + + @JsonValue + public String value() { + return this.value; + } + + @JsonCreator + public static Rule.FallbackResult fromValue(String value) { + Rule.FallbackResult constant = CONSTANTS.get(value); + if (constant == null) { + throw new IllegalArgumentException(value); + } else { + return constant; + } + } + + } + + public enum Policy { + + SPECIFIED("specified"), + REJECT("reject"), + DEFAULT_QUEUE("defaultQueue"), + USER("user"), + PRIMARY_GROUP("primaryGroup"), + SECONDARY_GROUP("secondaryGroup"), + PRIMARY_GROUP_USER("primaryGroupUser"), + SECONDARY_GROUP_USER("secondaryGroupUser"), + APPLICATION_NAME("applicationName"), + SET_DEFAULT_QUEUE("setDefaultQueue"), + CUSTOM("custom"); + private final String value; + private final static Map CONSTANTS = new HashMap(); + + static { + for (Rule.Policy c: values()) { + CONSTANTS.put(c.value, c); + } + } + + private Policy(String value) { + this.value = value; + } + + @Override + public String toString() { + return this.value; + } + + @JsonValue + public String value() { + return this.value; + } + + @JsonCreator + public static Rule.Policy fromValue(String value) { + Rule.Policy constant = CONSTANTS.get(value); + if (constant == null) { + throw new IllegalArgumentException(value); + } else { + return constant; + } + } + + } + + public enum Type { + + USER("user"), + GROUP("group"), + APPLICATION("application"); + private final String value; + private final static Map CONSTANTS = new HashMap(); + + static { + for (Rule.Type c: values()) { + CONSTANTS.put(c.value, c); + } + } + + private Type(String value) { + this.value = value; + } + + @Override + public String toString() { + return this.value; + } + + @JsonValue + public String value() { + return this.value; + } + + @JsonCreator + public static Rule.Type fromValue(String value) { + Rule.Type constant = CONSTANTS.get(value); + if (constant == null) { + throw new IllegalArgumentException(value); + } else { + return constant; + } + } + + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 0e4f30810b0..cdbdbe42bf5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -649,6 +649,8 @@ private Container allocateApplicationAttempt() { RMContainer rmContainer = mock(RMContainerImpl.class); when(scheduler.getRMContainer(container.getId())). thenReturn(rmContainer); + when(container.getNodeId()).thenReturn( + BuilderUtils.newNodeId("localhost", 0)); applicationAttempt.handle( new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(), @@ -1486,6 +1488,119 @@ public void testFinishedContainer() { .handle(Mockito.any(RMNodeEvent.class)); } + /** + * Check a completed container that is not yet pulled by AM heartbeat, + * is ACKed to NM for cleanup when the AM container exits. + */ + @Test + public void testFinishedContainerNotBeingPulledByAMHeartbeat() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); + + application.handle(new RMAppRunningOnNodeEvent(application + .getApplicationId(), amContainer.getNodeId())); + + // Complete a non-AM container + ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt + .getAppAttemptId(), 2); + Container container1 = mock(Container.class); + ContainerStatus containerStatus1 = mock(ContainerStatus.class); + when(container1.getId()).thenReturn( + containerId1); + when(containerStatus1.getContainerId()).thenReturn(containerId1); + when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234)); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + container1.getNodeId())); + + // Verify justFinishedContainers + ArgumentCaptor captor = + ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class); + Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers() + .size()); + Assert.assertEquals(container1.getId(), applicationAttempt + .getJustFinishedContainers().get(0).getContainerId()); + Assert.assertTrue( + getFinishedContainersSentToAM(applicationAttempt).isEmpty()); + + // finish AM container to emulate AM exit event + containerStatus1 = mock(ContainerStatus.class); + ContainerId amContainerId = amContainer.getId(); + when(containerStatus1.getContainerId()).thenReturn(amContainerId); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + amContainer.getNodeId())); + + Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture()); + List containerPulledEvents = + captor.getAllValues(); + // Verify AM container is acked to NM via the RMNodeEvent immediately + Assert.assertEquals(amContainer.getId(), + containerPulledEvents.get(0).getContainers().get(0)); + // Verify the non-AM container is acked to NM via the RMNodeEvent + Assert.assertEquals(container1.getId(), + containerPulledEvents.get(1).getContainers().get(0)); + Assert.assertTrue("No container shall be added to justFinishedContainers" + + " as soon as AM container exits", + applicationAttempt.getJustFinishedContainers().isEmpty()); + Assert.assertTrue( + getFinishedContainersSentToAM(applicationAttempt).isEmpty()); + } + + /** + * Check a completed container is ACKed to NM for cleanup after the AM + * container has exited. + */ + @Test + public void testFinishedContainerAfterAMExit() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); + + // finish AM container to emulate AM exit event + ContainerStatus containerStatus1 = mock(ContainerStatus.class); + ContainerId amContainerId = amContainer.getId(); + when(containerStatus1.getContainerId()).thenReturn(amContainerId); + application.handle(new RMAppRunningOnNodeEvent(application + .getApplicationId(), + amContainer.getNodeId())); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + amContainer.getNodeId())); + + // Verify AM container is acked to NM via the RMNodeEvent immediately + ArgumentCaptor captor = + ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class); + Mockito.verify(rmnodeEventHandler).handle(captor.capture()); + Assert.assertEquals(amContainer.getId(), + captor.getValue().getContainers().get(0)); + + // Complete a non-AM container + ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt + .getAppAttemptId(), 2); + Container container1 = mock(Container.class); + containerStatus1 = mock(ContainerStatus.class); + when(container1.getId()).thenReturn(containerId1); + when(containerStatus1.getContainerId()).thenReturn(containerId1); + when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234)); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + container1.getNodeId())); + + // Verify container is acked to NM via the RMNodeEvent immediately + captor = ArgumentCaptor.forClass( + RMNodeFinishedContainersPulledByAMEvent.class); + Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture()); + Assert.assertEquals(container1.getId(), + captor.getAllValues().get(1).getContainers().get(0)); + Assert.assertTrue("No container shall be added to justFinishedContainers" + + " after AM container exited", + applicationAttempt.getJustFinishedContainers().isEmpty()); + Assert.assertTrue( + getFinishedContainersSentToAM(applicationAttempt).isEmpty()); + } + private static List getFinishedContainersSentToAM( RMAppAttempt applicationAttempt) { List containers = new ArrayList();