diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AbstractResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AbstractResourceRequest.java
index 819a607..6da0015 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AbstractResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AbstractResourceRequest.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
@@ -37,12 +36,15 @@
@Unstable
public abstract class AbstractResourceRequest {
+
+ protected ContainerId containerToUpdate = null;
+
/**
* Set the Resource capability of the request
* @param capability Resource capability of the request
*/
@Public
- @Stable
+ @Unstable
public abstract void setCapability(Resource capability);
/**
@@ -50,6 +52,18 @@
* @return Resource capability of the request
*/
@Public
- @Stable
+ @Unstable
public abstract Resource getCapability();
+
+ public abstract Priority getPriority();
+
+ public abstract long getAllocationRequestId();
+
+ public ContainerId getContainerToUpdate() {
+ return containerToUpdate;
+ }
+
+ public void setContainerToUpdate(ContainerId containerToUpdate) {
+ this.containerToUpdate = containerToUpdate;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
index 7102f7b..e7458cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java
@@ -87,6 +87,12 @@ public int hashCode() {
}
@Override
+ public String toString() {
+ return "UpdateContainerError{reason=" + getReason() + ", "
+ + "req=" + getUpdateContainerRequest() + "}";
+ }
+
+ @Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
index e4f7a82..181dd08 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java
@@ -64,6 +64,9 @@
@InterfaceStability.Unstable
public abstract class UpdateContainerRequest extends AbstractResourceRequest {
+ private Priority priority = Priority.UNDEFINED;
+ private long allocationRequestId = -1;
+
@InterfaceAudience.Public
@InterfaceStability.Unstable
public static UpdateContainerRequest newInstance(int version,
@@ -144,17 +147,37 @@ public static UpdateContainerRequest newInstance(int version,
public abstract void setExecutionType(ExecutionType executionType);
@Override
+ public Priority getPriority() {
+ return priority;
+ }
+
+ public void setPriority(Priority priority) {
+ this.priority = priority;
+ }
+
+ @Override
+ public long getAllocationRequestId() {
+ return allocationRequestId;
+ }
+
+ public void setAllocationRequestId(long allocationRequestId) {
+ this.allocationRequestId = allocationRequestId;
+ }
+
+ @Override
public int hashCode() {
final int prime = 2153;
int result = 2459;
ContainerId cId = getContainerId();
ExecutionType execType = getExecutionType();
Resource capability = getCapability();
+ ContainerUpdateType updateType = getContainerUpdateType();
result =
prime * result + ((capability == null) ? 0 : capability.hashCode());
result = prime * result + ((cId == null) ? 0 : cId.hashCode());
result = prime * result + getContainerVersion();
result = prime * result + ((execType == null) ? 0 : execType.hashCode());
+ result = prime * result + ((updateType== null) ? 0 : updateType.hashCode());
return result;
}
@@ -208,6 +231,14 @@ public boolean equals(Object obj) {
} else if (!execType.equals(other.getExecutionType())) {
return false;
}
+ ContainerUpdateType updateType = getContainerUpdateType();
+ if (updateType == null) {
+ if (other.getContainerUpdateType() != null) {
+ return false;
+ }
+ } else if (!updateType.equals(other.getContainerUpdateType())) {
+ return false;
+ }
return true;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index c1300b2..6fd5228 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -321,13 +321,21 @@ private Container buildContainer(long rmIdentifier,
// before accepting an ask)
Resource capability = normalizeCapability(appParams, rr);
+ return createContainer(
+ rmIdentifier, appParams.getContainerTokenExpiryInterval(),
+ SchedulerRequestKey.create(rr), userName, node, cId, capability);
+ }
+
+ private Container createContainer(long rmIdentifier, long tokenExpiry,
+ SchedulerRequestKey schedulerKey, String userName, RemoteNode node,
+ ContainerId cId, Resource capability) {
long currTime = System.currentTimeMillis();
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(
cId, 0, node.getNodeId().toString(), userName,
- capability, currTime + appParams.containerTokenExpiryInterval,
+ capability, currTime + tokenExpiry,
tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier,
- rr.getPriority(), currTime,
+ schedulerKey.getPriority(), currTime,
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
ExecutionType.OPPORTUNISTIC);
byte[] pwd =
@@ -336,9 +344,9 @@ private Container buildContainer(long rmIdentifier,
containerTokenIdentifier);
Container container = BuilderUtils.newContainer(
cId, node.getNodeId(), node.getHttpAddress(),
- capability, rr.getPriority(), containerToken,
+ capability, schedulerKey.getPriority(), containerToken,
containerTokenIdentifier.getExecutionType(),
- rr.getAllocationRequestId());
+ schedulerKey.getAllocationRequestId());
return container;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
index a2f9f4d..1b1c5b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
@@ -150,8 +150,9 @@ public void addToOutstandingReqs(List resourceAsks) {
resourceRequest.getNumContainers() + request.getNumContainers());
}
if (ResourceRequest.isAnyLocation(request.getResourceName())) {
- LOG.info("# of outstandingOpReqs in ANY (at" +
- "priority = "+ schedulerKey.getPriority()
+ LOG.info("# of outstandingOpReqs in ANY (at "
+ + "priority = " + schedulerKey.getPriority()
+ + ", allocationReqId = " + schedulerKey.getAllocationRequestId()
+ ", with capability = " + request.getCapability() + " ) : "
+ resourceRequest.getNumContainers());
}
@@ -167,7 +168,8 @@ public void addToOutstandingReqs(List resourceAsks) {
public void matchAllocationToOutstandingRequest(Resource capability,
List allocatedContainers) {
for (Container c : allocatedContainers) {
- SchedulerRequestKey schedulerKey = SchedulerRequestKey.extractFrom(c);
+ SchedulerRequestKey schedulerKey =
+ SchedulerRequestKey.extractFrom(c);
Map asks =
outstandingOpReqs.get(schedulerKey);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
index 9b7edbe..4406389 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.server.scheduler;
+import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -31,15 +33,16 @@
private final Priority priority;
private final long allocationRequestId;
+ private final ContainerId containerToUpdate;
/**
* Factory method to generate a SchedulerRequestKey from a ResourceRequest.
* @param req ResourceRequest
* @return SchedulerRequestKey
*/
- public static SchedulerRequestKey create(ResourceRequest req) {
+ public static SchedulerRequestKey create(AbstractResourceRequest req) {
return new SchedulerRequestKey(req.getPriority(),
- req.getAllocationRequestId());
+ req.getAllocationRequestId(), req.getContainerToUpdate());
}
/**
@@ -50,12 +53,16 @@ public static SchedulerRequestKey create(ResourceRequest req) {
*/
public static SchedulerRequestKey extractFrom(Container container) {
return new SchedulerRequestKey(container.getPriority(),
- container.getAllocationRequestId());
+ container.getAllocationRequestId(), null);
}
- SchedulerRequestKey(Priority priority, long allocationRequestId) {
+
+
+ public SchedulerRequestKey(Priority priority, long allocationRequestId,
+ ContainerId containerToUpdate) {
this.priority = priority;
this.allocationRequestId = allocationRequestId;
+ this.containerToUpdate = containerToUpdate;
}
/**
@@ -76,6 +83,10 @@ public long getAllocationRequestId() {
return allocationRequestId;
}
+ public ContainerId getContainerToUpdate() {
+ return containerToUpdate;
+ }
+
@Override
public int compareTo(SchedulerRequestKey o) {
if (o == null) {
@@ -85,6 +96,15 @@ public int compareTo(SchedulerRequestKey o) {
return 1;
}
}
+
+ // Ensure updates are ranked higher
+ if (this.containerToUpdate == null && o.containerToUpdate != null) {
+ return -1;
+ }
+ if (this.containerToUpdate != null && o.containerToUpdate == null) {
+ return 1;
+ }
+
int priorityCompare = o.getPriority().compareTo(priority);
// we first sort by priority and then by allocationRequestId
if (priorityCompare != 0) {
@@ -107,16 +127,21 @@ public boolean equals(Object o) {
if (getAllocationRequestId() != that.getAllocationRequestId()) {
return false;
}
- return getPriority() != null ?
- getPriority().equals(that.getPriority()) :
- that.getPriority() == null;
+ if (!getPriority().equals(that.getPriority())) {
+ return false;
+ }
+ return containerToUpdate != null ?
+ containerToUpdate.equals(that.containerToUpdate) :
+ that.containerToUpdate == null;
}
@Override
public int hashCode() {
- int result = getPriority() != null ? getPriority().hashCode() : 0;
- result = 31 * result + (int) (getAllocationRequestId() ^ (
- getAllocationRequestId() >>> 32));
+ int result = priority != null ? priority.hashCode() : 0;
+ result = 31 * result + (int) (allocationRequestId ^ (allocationRequestId
+ >>> 32));
+ result = 31 * result + (containerToUpdate != null ? containerToUpdate
+ .hashCode() : 0);
return result;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index 9d4c092..647d91d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -26,7 +26,12 @@
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.UpdateContainerError;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
@@ -69,9 +74,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
-
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import java.io.IOException;
@@ -249,11 +255,117 @@ protected void allocateInternal(ApplicationAttemptId appAttemptId,
addToAllocatedContainers(allocateResponse, oppContainers);
}
+ handleExecutionTypeUpdates(appAttempt, request, allocateResponse);
+
+ List promotedContainers =
+ appAttempt.pullContainersWithUpdatedExecType();
+ addToUpdatedContainers(allocateResponse,
+ ContainerUpdateType.UPDATE_EXECUTION_TYPE, promotedContainers);
+
// Allocate GUARANTEED containers.
request.setAskList(partitionedAsks.getGuaranteed());
+
super.allocateInternal(appAttemptId, request, allocateResponse);
}
+ private void handleExecutionTypeUpdates(
+ SchedulerApplicationAttempt appAttempt, AllocateRequest request,
+ AllocateResponse allocateResponse) {
+ List promotionRequests = new ArrayList<>();
+ List demotionRequests = new ArrayList<>();
+
+ List updateContainerErrors = RMServerUtils
+ .validateAndSplitUpdateExecutionTypeRequests(rmContext,
+ request, promotionRequests, demotionRequests);
+
+ if (!promotionRequests.isEmpty()) {
+ LOG.info("Promotion Update requests : " + promotionRequests);
+ }
+ if (!demotionRequests.isEmpty()) {
+ LOG.info("Demotion Update requests : " + demotionRequests);
+ }
+
+ handlePromotionRequests(
+ appAttempt, promotionRequests, updateContainerErrors);
+ handleDemotionRequests(
+ appAttempt, demotionRequests, updateContainerErrors);
+ addToUpdateContainerErrors(allocateResponse, updateContainerErrors);
+ }
+
+ private void handleDemotionRequests(SchedulerApplicationAttempt appAttempt,
+ List demotionRequests,
+ List updateContainerErrors) {
+ OpportunisticContainerContext oppCntxt =
+ appAttempt.getOpportunisticContainerContext();
+ for (UpdateContainerRequest uReq : demotionRequests) {
+ RMContainer rmContainer =
+ rmContext.getScheduler().getRMContainer(uReq.getContainerId());
+ if (rmContainer != null) {
+ if (appAttempt.getUpdateContext().checkAndAddToOutstandingDecreases(
+ rmContainer.getContainer())) {
+ RMContainer demotedRMContainer =
+ createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
+ appAttempt.addToNewlyDemotedContainers(
+ uReq.getContainerId(), demotedRMContainer);
+ } else {
+ updateContainerErrors.add(UpdateContainerError.newInstance(
+ RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq));
+ }
+ } else {
+ LOG.warn("Cannot demote non-existent (or completed) Container ["
+ + uReq.getContainerId() + "]");
+ }
+ }
+ }
+
+ private RMContainer createDemotedRMContainer(
+ SchedulerApplicationAttempt appAttempt,
+ OpportunisticContainerContext oppCntxt,
+ RMContainer rmContainer) {
+ SchedulerRequestKey sk =
+ SchedulerRequestKey.extractFrom(rmContainer.getContainer());
+ Container demotedContainer = BuilderUtils.newContainer(
+ ContainerId.newContainerId(appAttempt.getApplicationAttemptId(),
+ oppCntxt.getContainerIdGenerator().generateContainerId()),
+ rmContainer.getContainer().getNodeId(),
+ rmContainer.getContainer().getNodeHttpAddress(),
+ rmContainer.getContainer().getResource(),
+ sk.getPriority(), null, ExecutionType.OPPORTUNISTIC,
+ sk.getAllocationRequestId());
+ demotedContainer.setVersion(rmContainer.getContainer().getVersion());
+ return createRmContainer(demotedContainer, false);
+ }
+
+ public void handlePromotionRequests(
+ SchedulerApplicationAttempt applicationAttempt,
+ List updateContainerRequests,
+ List updateContainerErrors) {
+ for (UpdateContainerRequest uReq : updateContainerRequests) {
+ RMContainer rmContainer =
+ rmContext.getScheduler().getRMContainer(uReq.getContainerId());
+ // Check if this is a container update
+ // And not in the middle of a Demotion
+ if (rmContainer != null) {
+ // Check if this is an executionType change request
+ // If so, fix the rr to make it look like a normal rr
+ // with relaxLocality=false and numContainers=1
+ SchedulerNode schedulerNode = rmContext.getScheduler()
+ .getSchedulerNode(rmContainer.getContainer().getNodeId());
+
+ // Add only if no outstanding promote requests exist.
+ if (!applicationAttempt.getUpdateContext()
+ .checkAndAddToOutstandingIncreases(
+ rmContainer, schedulerNode, uReq)) {
+ updateContainerErrors.add(UpdateContainerError.newInstance(
+ RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq));
+ }
+ } else {
+ LOG.warn("Cannot promote non-existent (or completed) Container ["
+ + uReq.getContainerId() + "]");
+ }
+ }
+ }
+
@Override
public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
@@ -298,21 +410,29 @@ private void handleNewContainers(List allocContainers,
boolean isRemotelyAllocated) {
for (Container container : allocContainers) {
// Create RMContainer
- SchedulerApplicationAttempt appAttempt =
- ((AbstractYarnScheduler) rmContext.getScheduler())
- .getCurrentAttemptForContainer(container.getId());
- RMContainer rmContainer = new RMContainerImpl(container,
- appAttempt.getApplicationAttemptId(), container.getNodeId(),
- appAttempt.getUser(), rmContext, isRemotelyAllocated);
- appAttempt.addRMContainer(container.getId(), rmContainer);
- ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
- container.getNodeId()).allocateContainer(rmContainer);
+ RMContainer rmContainer =
+ createRmContainer(container, isRemotelyAllocated);
rmContainer.handle(
new RMContainerEvent(container.getId(),
RMContainerEventType.ACQUIRED));
}
}
+ private RMContainer createRmContainer(
+ Container container, boolean isRemotelyAllocated) {
+ SchedulerApplicationAttempt appAttempt =
+ ((AbstractYarnScheduler) rmContext.getScheduler())
+ .getCurrentAttemptForContainer(container.getId());
+ RMContainer rmContainer = new RMContainerImpl(container,
+ SchedulerRequestKey.extractFrom(container),
+ appAttempt.getApplicationAttemptId(), container.getNodeId(),
+ appAttempt.getUser(), rmContext, isRemotelyAllocated);
+ appAttempt.addRMContainer(container.getId(), rmContainer);
+ ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
+ container.getNodeId()).allocateContainer(rmContainer);
+ return rmContainer;
+ }
+
@Override
public void handle(SchedulerEvent event) {
switch (event.getType()) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 74898ca..8e91cba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -39,6 +39,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -80,7 +82,7 @@
*/
public class RMServerUtils {
- private static final String UPDATE_OUTSTANDING_ERROR =
+ public static final String UPDATE_OUTSTANDING_ERROR =
"UPDATE_OUTSTANDING_ERROR";
private static final String INCORRECT_CONTAINER_VERSION_ERROR =
"INCORRECT_CONTAINER_VERSION_ERROR";
@@ -123,6 +125,52 @@
}
/**
+ *
+ * @param rmContext RM context
+ * @param request Allocate Request
+ * @param promoteExecTypeReqs Promotion requests
+ * @param demoteExecTypeReqs Demotion requests
+ * @return List of container update Errors
+ */
+ public static List
+ validateAndSplitUpdateExecutionTypeRequests(RMContext rmContext,
+ AllocateRequest request, List promoteExecTypeReqs,
+ List demoteExecTypeReqs) {
+ List errors = new ArrayList<>();
+ Set outstandingUpdate = new HashSet<>();
+ for (UpdateContainerRequest updateReq : request.getUpdateRequests()) {
+ if (updateReq.getContainerUpdateType() ==
+ ContainerUpdateType.UPDATE_EXECUTION_TYPE) {
+ RMContainer rmContainer = rmContext.getScheduler().getRMContainer(
+ updateReq.getContainerId());
+ String msg = validateContainerIdAndVersion(outstandingUpdate,
+ updateReq, rmContainer);
+ if (msg == null) {
+ ExecutionType original = rmContainer.getExecutionType();
+ ExecutionType target = updateReq.getExecutionType();
+ if (target != original) {
+ if (target == ExecutionType.GUARANTEED &&
+ original == ExecutionType.OPPORTUNISTIC) {
+ updateReq.setPriority(rmContainer.getAllocatedPriority());
+ updateReq.setAllocationRequestId(
+ rmContainer.getAllocatedSchedulerKey()
+ .getAllocationRequestId());
+ promoteExecTypeReqs.add(updateReq);
+ outstandingUpdate.add(updateReq.getContainerId());
+ } else if (target == ExecutionType.OPPORTUNISTIC &&
+ original == ExecutionType.GUARANTEED) {
+ demoteExecTypeReqs.add(updateReq);
+ outstandingUpdate.add(updateReq.getContainerId());
+ }
+ }
+ }
+ checkAndcreateUpdateError(errors, updateReq, msg);
+ }
+ }
+ return errors;
+ }
+
+ /**
* Check if we have:
* - Request for same containerId and different target resource
* - If targetResources violates maximum/minimumAllocation
@@ -131,7 +179,7 @@
* @param maximumAllocation Maximum Allocation
* @param increaseResourceReqs Increase Resource Request
* @param decreaseResourceReqs Decrease Resource Request
- * @return List of container Errors
+ * @return List of container update Errors
*/
public static List
validateAndSplitUpdateResourceRequests(RMContext rmContext,
@@ -141,59 +189,76 @@
List errors = new ArrayList<>();
Set outstandingUpdate = new HashSet<>();
for (UpdateContainerRequest updateReq : request.getUpdateRequests()) {
- RMContainer rmContainer = rmContext.getScheduler().getRMContainer(
- updateReq.getContainerId());
- String msg = null;
- if (rmContainer == null) {
- msg = INVALID_CONTAINER_ID;
- }
- // Only allow updates if the requested version matches the current
- // version
- if (msg == null && updateReq.getContainerVersion() !=
- rmContainer.getContainer().getVersion()) {
- msg = INCORRECT_CONTAINER_VERSION_ERROR + "|"
- + updateReq.getContainerVersion() + "|"
- + rmContainer.getContainer().getVersion();
- }
- // No more than 1 container update per request.
- if (msg == null &&
- outstandingUpdate.contains(updateReq.getContainerId())) {
- msg = UPDATE_OUTSTANDING_ERROR;
- }
- if (msg == null) {
- Resource original = rmContainer.getContainer().getResource();
- Resource target = updateReq.getCapability();
- if (Resources.fitsIn(target, original)) {
- // This is a decrease request
- if (validateIncreaseDecreaseRequest(rmContext, updateReq,
- maximumAllocation, false)) {
- decreaseResourceReqs.add(updateReq);
- outstandingUpdate.add(updateReq.getContainerId());
+ if (updateReq.getContainerUpdateType() !=
+ ContainerUpdateType.UPDATE_EXECUTION_TYPE) {
+ RMContainer rmContainer = rmContext.getScheduler().getRMContainer(
+ updateReq.getContainerId());
+ String msg = validateContainerIdAndVersion(outstandingUpdate,
+ updateReq, rmContainer);
+ if (msg == null) {
+ Resource original = rmContainer.getContainer().getResource();
+ Resource target = updateReq.getCapability();
+ if (Resources.fitsIn(target, original)) {
+ // This is a decrease request
+ if (validateIncreaseDecreaseRequest(rmContext, updateReq,
+ maximumAllocation, false)) {
+ decreaseResourceReqs.add(updateReq);
+ outstandingUpdate.add(updateReq.getContainerId());
+ } else {
+ msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
+ }
} else {
- msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
- }
- } else {
- // This is an increase request
- if (validateIncreaseDecreaseRequest(rmContext, updateReq,
- maximumAllocation, true)) {
- increaseResourceReqs.add(updateReq);
- outstandingUpdate.add(updateReq.getContainerId());
- } else {
- msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
+ // This is an increase request
+ if (validateIncreaseDecreaseRequest(rmContext, updateReq,
+ maximumAllocation, true)) {
+ increaseResourceReqs.add(updateReq);
+ outstandingUpdate.add(updateReq.getContainerId());
+ } else {
+ msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
+ }
}
}
- }
- if (msg != null) {
- UpdateContainerError updateError = RECORD_FACTORY
- .newRecordInstance(UpdateContainerError.class);
- updateError.setReason(msg);
- updateError.setUpdateContainerRequest(updateReq);
- errors.add(updateError);
+ checkAndcreateUpdateError(errors, updateReq, msg);
}
}
return errors;
}
+ private static void checkAndcreateUpdateError(
+ List errors, UpdateContainerRequest updateReq,
+ String msg) {
+ if (msg != null) {
+ UpdateContainerError updateError = RECORD_FACTORY
+ .newRecordInstance(UpdateContainerError.class);
+ updateError.setReason(msg);
+ updateError.setUpdateContainerRequest(updateReq);
+ errors.add(updateError);
+ }
+ }
+
+ private static String validateContainerIdAndVersion(
+ Set outstandingUpdate, UpdateContainerRequest updateReq,
+ RMContainer rmContainer) {
+ String msg = null;
+ if (rmContainer == null) {
+ msg = INVALID_CONTAINER_ID;
+ }
+ // Only allow updates if the requested version matches the current
+ // version
+ if (msg == null && updateReq.getContainerVersion() !=
+ rmContainer.getContainer().getVersion()) {
+ msg = INCORRECT_CONTAINER_VERSION_ERROR + "|"
+ + updateReq.getContainerVersion() + "|"
+ + rmContainer.getContainer().getVersion();
+ }
+ // No more than 1 container update per request.
+ if (msg == null &&
+ outstandingUpdate.contains(updateReq.getContainerId())) {
+ msg = UPDATE_OUTSTANDING_ERROR;
+ }
+ return msg;
+ }
+
/**
* Utility method to validate a list resource requests, by insuring that the
* requested memory/vcore is non-negative and not greater than max
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index dbc6169..41878d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -108,6 +108,8 @@
// Transitions from ACQUIRED state
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
RMContainerEventType.LAUNCHED)
+ .addTransition(RMContainerState.ACQUIRED, RMContainerState.ACQUIRED,
+ RMContainerEventType.ACQUIRED)
.addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED,
RMContainerEventType.FINISHED, new FinishedTransition())
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED,
@@ -125,6 +127,8 @@
.addTransition(RMContainerState.RUNNING, RMContainerState.RELEASED,
RMContainerEventType.RELEASED, new KillTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
+ RMContainerEventType.ACQUIRED)
+ .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
@@ -163,13 +167,13 @@
private final WriteLock writeLock;
private final ApplicationAttemptId appAttemptId;
private final NodeId nodeId;
- private final Container container;
private final RMContext rmContext;
private final EventHandler eventHandler;
private final ContainerAllocationExpirer containerAllocationExpirer;
private final String user;
private final String nodeLabelExpression;
+ private Container container;
private Resource reservedResource;
private NodeId reservedNode;
private SchedulerRequestKey reservedSchedulerKey;
@@ -188,44 +192,44 @@
private boolean isExternallyAllocated;
private SchedulerRequestKey allocatedSchedulerKey;
- public RMContainerImpl(Container container,
+ public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext) {
- this(container, appAttemptId, nodeId, user, rmContext, System
+ this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System
.currentTimeMillis(), "");
}
- public RMContainerImpl(Container container,
+ public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, boolean isExternallyAllocated) {
- this(container, appAttemptId, nodeId, user, rmContext, System
+ this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System
.currentTimeMillis(), "", isExternallyAllocated);
}
private boolean saveNonAMContainerMetaInfo;
- public RMContainerImpl(Container container,
+ public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, String nodeLabelExpression) {
- this(container, appAttemptId, nodeId, user, rmContext, System
+ this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System
.currentTimeMillis(), nodeLabelExpression);
}
- public RMContainerImpl(Container container,
+ public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, long creationTime, String nodeLabelExpression) {
- this(container, appAttemptId, nodeId, user, rmContext, creationTime,
- nodeLabelExpression, false);
+ this(container, schedulerKey, appAttemptId, nodeId, user, rmContext,
+ creationTime, nodeLabelExpression, false);
}
- public RMContainerImpl(Container container,
+ public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, long creationTime, String nodeLabelExpression,
boolean isExternallyAllocated) {
this.stateMachine = stateMachineFactory.make(this);
this.nodeId = nodeId;
this.container = container;
- this.allocatedSchedulerKey = SchedulerRequestKey.extractFrom(container);
+ this.allocatedSchedulerKey = schedulerKey;
this.appAttemptId = appAttemptId;
this.user = user;
this.creationTime = creationTime;
@@ -272,10 +276,14 @@ public ApplicationAttemptId getApplicationAttemptId() {
}
@Override
- public Container getContainer() {
+ public synchronized Container getContainer() {
return this.container;
}
+ public synchronized void setContainer(Container container) {
+ this.container = container;
+ }
+
@Override
public RMContainerState getState() {
this.readLock.lock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index c0cc6b0..ddc99bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -51,7 +52,6 @@
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
-import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -81,6 +81,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
@@ -503,9 +505,11 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status,
ApplicationAttemptId attemptId =
container.getId().getApplicationAttemptId();
RMContainer rmContainer =
- new RMContainerImpl(container, attemptId, node.getNodeID(),
- applications.get(attemptId.getApplicationId()).getUser(), rmContext,
- status.getCreationTime(), status.getNodeLabelExpression());
+ new RMContainerImpl(container,
+ SchedulerRequestKey.extractFrom(container), attemptId,
+ node.getNodeID(), applications.get(
+ attemptId.getApplicationId()).getUser(), rmContext,
+ status.getCreationTime(), status.getNodeLabelExpression());
return rmContainer;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 0551df1..1b01502 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -55,7 +55,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
/**
* This class keeps track of all the consumption of an application. This also
* keeps track of current running/completed containers for the application.
@@ -92,10 +91,11 @@
final Map>> containerIncreaseRequestMap =
new ConcurrentHashMap<>();
-
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
+ public final ContainerUpdateContext updateContext;
+
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
long epoch, ResourceUsage appResourceUsage) {
@@ -109,6 +109,7 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
this.appResourceUsage = appResourceUsage;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ updateContext = new ContainerUpdateContext(this);
readLock = lock.readLock();
writeLock = lock.writeLock();
}
@@ -376,6 +377,10 @@ public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
}
}
+ public ContainerUpdateContext getUpdateContext() {
+ return updateContext;
+ }
+
/**
* The ApplicationMaster is updating resource requirements for the
* application, by asking for more resources and releasing resources acquired
@@ -413,29 +418,9 @@ public boolean updateResourceRequests(List requests,
}
// Update scheduling placement set
- for (Map.Entry> entry : dedupRequests.entrySet()) {
- SchedulerRequestKey schedulerRequestKey = entry.getKey();
-
- if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) {
- schedulerKeyToPlacementSets.put(schedulerRequestKey,
- new LocalitySchedulingPlacementSet<>(this));
- }
-
- // Update placement set
- ResourceRequestUpdateResult pendingAmountChanges =
- schedulerKeyToPlacementSets.get(schedulerRequestKey)
- .updateResourceRequests(
- entry.getValue().values(),
- recoverPreemptedRequestForAContainer);
-
- if (null != pendingAmountChanges) {
- updatePendingResources(
- pendingAmountChanges.getLastAnyResourceRequest(),
- pendingAmountChanges.getNewResourceRequest(), schedulerRequestKey,
- queue.getMetrics());
- offswitchResourcesUpdated = true;
- }
- }
+ offswitchResourcesUpdated =
+ addToPlacementSets(
+ recoverPreemptedRequestForAContainer, dedupRequests);
return offswitchResourcesUpdated;
} finally {
@@ -443,6 +428,37 @@ public boolean updateResourceRequests(List requests,
}
}
+ boolean addToPlacementSets(
+ boolean recoverPreemptedRequestForAContainer,
+ Map> dedupRequests) {
+ boolean offswitchResourcesUpdated = false;
+ for (Map.Entry> entry :
+ dedupRequests.entrySet()) {
+ SchedulerRequestKey schedulerRequestKey = entry.getKey();
+
+ if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) {
+ schedulerKeyToPlacementSets.put(schedulerRequestKey,
+ new LocalitySchedulingPlacementSet<>(this));
+ }
+
+ // Update placement set
+ ResourceRequestUpdateResult pendingAmountChanges =
+ schedulerKeyToPlacementSets.get(schedulerRequestKey)
+ .updateResourceRequests(
+ entry.getValue().values(),
+ recoverPreemptedRequestForAContainer);
+
+ if (null != pendingAmountChanges) {
+ updatePendingResources(
+ pendingAmountChanges.getLastAnyResourceRequest(),
+ pendingAmountChanges.getNewResourceRequest(), schedulerRequestKey,
+ queue.getMetrics());
+ offswitchResourcesUpdated = true;
+ }
+ }
+ return offswitchResourcesUpdated;
+ }
+
private void updatePendingResources(ResourceRequest lastRequest,
ResourceRequest request, SchedulerRequestKey schedulerKey,
QueueMetrics metrics) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
new file mode 100644
index 0000000..12a5600
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class encapsulates all outstanding container increase and decrease
+ * requests for an application.
+ */
+public class ContainerUpdateContext {
+
+ public static final ContainerId UNDEFINED =
+ ContainerId.newContainerId(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(-1, -1), -1), -1);
+ protected static final RecordFactory RECORD_FACTORY =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ // Keep track of containers that are undergoing promotion
+ private final Map>>> outstandingIncreases = new HashMap<>();
+
+ private final Set outstandingDecreases = new HashSet<>();
+ private final AppSchedulingInfo appSchedulingInfo;
+
+ ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) {
+ this.appSchedulingInfo = appSchedulingInfo;
+ }
+
+ private synchronized boolean isBeingIncreased(Container container) {
+ Map>> resourceMap =
+ outstandingIncreases.get(
+ new SchedulerRequestKey(container.getPriority(),
+ container.getAllocationRequestId(), container.getId()));
+ if (resourceMap != null) {
+ Map> locationMap =
+ resourceMap.get(container.getResource());
+ if (locationMap != null) {
+ Set containerIds = locationMap.get(container.getNodeId());
+ if (containerIds != null && !containerIds.isEmpty()) {
+ return containerIds.contains(container.getId());
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Add the container to outstanding decreases.
+ * @param container Container.
+ * @return true if updated to outstanding decreases was successful.
+ */
+ public synchronized boolean checkAndAddToOutstandingDecreases(
+ Container container) {
+ if (isBeingIncreased(container)
+ || outstandingDecreases.contains(container.getId())) {
+ return false;
+ }
+ outstandingDecreases.add(container.getId());
+ return true;
+ }
+
+ /**
+ * Add the container to outstanding increases.
+ * @param rmContainer RMContainer.
+ * @param schedulerNode SchedulerNode.
+ * @param updateRequest UpdateContainerRequest.
+ * @return true if updated to outstanding increases was successful.
+ */
+ public synchronized boolean checkAndAddToOutstandingIncreases(
+ RMContainer rmContainer, SchedulerNode schedulerNode,
+ UpdateContainerRequest updateRequest) {
+ SchedulerRequestKey schedulerKey =
+ SchedulerRequestKey.create(updateRequest);
+ Container container = rmContainer.getContainer();
+ Map>> resourceMap =
+ outstandingIncreases.get(schedulerKey);
+ if (resourceMap == null) {
+ resourceMap = new HashMap<>();
+ outstandingIncreases.put(schedulerKey, resourceMap);
+ }
+ Map> locationMap =
+ resourceMap.get(container.getResource());
+ if (locationMap == null) {
+ locationMap = new HashMap<>();
+ resourceMap.put(container.getResource(), locationMap);
+ }
+ Set containerIds = locationMap.get(container.getNodeId());
+ if (containerIds == null) {
+ containerIds = new HashSet<>();
+ locationMap.put(container.getNodeId(), containerIds);
+ }
+ if (containerIds.contains(container.getId())
+ || outstandingDecreases.contains(container.getId())) {
+ return false;
+ }
+ containerIds.add(container.getId());
+
+ Map> updateResReqs =
+ new HashMap<>();
+ updateResReqs.put(schedulerKey, new HashMap<>());
+ Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
+ updateResReqs.get(schedulerKey).put(container.getNodeId().getHost(),
+ createResourceReqForIncrease(schedulerKey, resToIncrease,
+ RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
+ rmContainer, container.getNodeId().getHost()));
+ updateResReqs.get(schedulerKey).put(schedulerNode.getRackName(),
+ createResourceReqForIncrease(schedulerKey, resToIncrease,
+ RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
+ rmContainer, schedulerNode.getRackName()));
+ updateResReqs.get(schedulerKey).put(ResourceRequest.ANY,
+ createResourceReqForIncrease(schedulerKey, resToIncrease,
+ RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
+ rmContainer, ResourceRequest.ANY));
+ appSchedulingInfo.addToPlacementSets(false, updateResReqs);
+ return true;
+ }
+
+ private Resource getResourceToIncrease(UpdateContainerRequest updateReq,
+ RMContainer rmContainer) {
+ if (updateReq.getContainerUpdateType() ==
+ ContainerUpdateType.UPDATE_EXECUTION_TYPE) {
+ return rmContainer.getContainer().getResource();
+ }
+ // TODO: Fix this for container increase..
+ // This has to equal the Resources in excess of fitsIn()
+ // for container increase and is equal to the container total
+ // resource for Promotion.
+ return null;
+ }
+
+ private static ResourceRequest createResourceReqForIncrease(
+ SchedulerRequestKey schedulerRequestKey, Resource resToIncrease,
+ ResourceRequest rr, RMContainer rmContainer, String resourceName) {
+ rr.setResourceName(resourceName);
+ rr.setNumContainers(1);
+ rr.setRelaxLocality(false);
+ rr.setPriority(rmContainer.getContainer().getPriority());
+ rr.setAllocationRequestId(schedulerRequestKey.getAllocationRequestId());
+ rr.setCapability(resToIncrease);
+ rr.setNodeLabelExpression(rmContainer.getNodeLabelExpression());
+ rr.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(
+ ExecutionType.GUARANTEED, true));
+ rr.setContainerToUpdate(schedulerRequestKey.getContainerToUpdate());
+ return rr;
+ }
+
+ /**
+ * Remove Container from outstanding increases / decreases. Calling this
+ * method essentially completes the update process.
+ * @param schedulerKey SchedulerRequestKey.
+ * @param container Container.
+ */
+ public synchronized void removeFromOutstandingUpdate(
+ SchedulerRequestKey schedulerKey, Container container) {
+ Map>> resourceMap =
+ outstandingIncreases.get(schedulerKey);
+ if (resourceMap != null) {
+ Map> locationMap =
+ resourceMap.get(container.getResource());
+ if (locationMap != null) {
+ Set containerIds = locationMap.get(container.getNodeId());
+ if (containerIds != null && !containerIds.isEmpty()) {
+ containerIds.remove(container.getId());
+ if (containerIds.isEmpty()) {
+ locationMap.remove(container.getNodeId());
+ }
+ }
+ if (locationMap.isEmpty()) {
+ resourceMap.remove(container.getResource());
+ }
+ }
+ if (resourceMap.isEmpty()) {
+ outstandingIncreases.remove(schedulerKey);
+ }
+ }
+ outstandingDecreases.remove(container.getId());
+ }
+
+ /**
+ * Check if a new container is to be matched up against an outstanding
+ * Container increase request.
+ * @param schedulerKey SchedulerRequestKey.
+ * @param container Container.
+ * @return ContainerId.
+ */
+ public ContainerId matchContainerToOutstandingIncreaseReq(
+ SchedulerRequestKey schedulerKey, Container container) {
+ ContainerId retVal = null;
+ Map>> resourceMap =
+ outstandingIncreases.get(schedulerKey);
+ if (resourceMap != null) {
+ Map> locationMap =
+ resourceMap.get(container.getResource());
+ if (locationMap != null) {
+ Set containerIds = locationMap.get(container.getNodeId());
+ if (containerIds != null && !containerIds.isEmpty()) {
+ retVal = containerIds.iterator().next();
+ }
+ }
+ }
+ // Allocation happened on NM on the same host, but not on the NM
+ // we need.. We need to signal that this container has to be released.
+ if (resourceMap != null && retVal == null) {
+ return UNDEFINED;
+ }
+ return retVal;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index e94d800..76cb142 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -47,6 +48,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -133,6 +135,9 @@
private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
protected List newlyAllocatedContainers = new ArrayList<>();
+ protected Map newlyPromotedContainers = new HashMap<>();
+ protected Map newlyDemotedContainers = new HashMap<>();
+ protected List tempContainerToKill = new ArrayList<>();
protected Map newlyDecreasedContainers = new HashMap<>();
protected Map newlyIncreasedContainers = new HashMap<>();
protected Set updatedNMTokens = new HashSet<>();
@@ -247,6 +252,10 @@ public AppSchedulingInfo getAppSchedulingInfo() {
return this.appSchedulingInfo;
}
+ public ContainerUpdateContext getUpdateContext() {
+ return this.appSchedulingInfo.getUpdateContext();
+ }
+
/**
* Is this application pending?
* @return true if it is else false.
@@ -537,8 +546,9 @@ public RMContainer reserve(SchedulerNode node,
writeLock.lock();
// Create RMContainer if necessary
if (rmContainer == null) {
- rmContainer = new RMContainerImpl(container, getApplicationAttemptId(),
- node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
+ rmContainer = new RMContainerImpl(container, schedulerKey,
+ getApplicationAttemptId(), node.getNodeID(),
+ appSchedulingInfo.getUser(), rmContext);
}
if (rmContainer.getState() == RMContainerState.NEW) {
attemptResourceUsage.incReserved(node.getPartition(),
@@ -635,10 +645,10 @@ public Resource getCurrentConsumption() {
}
private Container updateContainerAndNMToken(RMContainer rmContainer,
- boolean newContainer, boolean increasedContainer) {
+ ContainerUpdateType updateType) {
Container container = rmContainer.getContainer();
ContainerType containerType = ContainerType.TASK;
- if (!newContainer) {
+ if (updateType != null) {
container.setVersion(container.getVersion() + 1);
}
// The working knowledge is that masterContainer for AM is null as it
@@ -662,12 +672,14 @@ private Container updateContainerAndNMToken(RMContainer rmContainer,
return null;
}
- if (newContainer) {
+ if (updateType == null ||
+ ContainerUpdateType.UPDATE_EXECUTION_TYPE == updateType) {
rmContainer.handle(new RMContainerEvent(
rmContainer.getContainerId(), RMContainerEventType.ACQUIRED));
} else {
rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
- rmContainer.getContainerId(), increasedContainer));
+ rmContainer.getContainerId(),
+ ContainerUpdateType.INCREASE_RESOURCE == updateType));
}
return container;
}
@@ -699,8 +711,8 @@ private void updateNMToken(Container container) {
Iterator i = newlyAllocatedContainers.iterator();
while (i.hasNext()) {
RMContainer rmContainer = i.next();
- Container updatedContainer = updateContainerAndNMToken(rmContainer,
- true, false);
+ Container updatedContainer =
+ updateContainerAndNMToken(rmContainer, null);
// Only add container to return list when it's not null.
// updatedContainer could be null when generate token failed, it can be
// caused by DNS resolving failed.
@@ -713,9 +725,121 @@ private void updateNMToken(Container container) {
} finally {
writeLock.unlock();
}
+ }
+ public void addToNewlyDemotedContainers(ContainerId containerId,
+ RMContainer rmContainer) {
+ newlyDemotedContainers.put(containerId, rmContainer);
}
-
+
+ protected synchronized void addToNewlyAllocatedContainers(
+ RMContainer rmContainer) {
+ if (oppContainerContext == null) {
+ newlyAllocatedContainers.add(rmContainer);
+ return;
+ }
+ ContainerId matchedContainerId =
+ getUpdateContext().matchContainerToOutstandingIncreaseReq(
+ rmContainer.getAllocatedSchedulerKey(), rmContainer.getContainer());
+ if (matchedContainerId != null) {
+ if (ContainerUpdateContext.UNDEFINED == matchedContainerId) {
+ // This is a spurious allocation (relaxLocality = false
+ // resulted in the Container being allocated on an NM on the same host
+ // but not on the NM running the container to be updated. Can
+ // happen if more than one NM exists on the same host.. usually
+ // occurs when using MiniYARNCluster to test).
+ tempContainerToKill.add(rmContainer);
+ } else {
+ newlyPromotedContainers.put(matchedContainerId, rmContainer);
+ }
+ } else {
+ newlyAllocatedContainers.add(rmContainer);
+ }
+ }
+
+ /**
+ * A container is promoted if its executionType is changed from
+ * OPPORTUNISTIC to GUARANTEED. It id demoted if the change is from
+ * GUARANTEED to OPPORTUNISTIC.
+ * @return Newly Promoted and Demoted containers
+ */
+ public List pullContainersWithUpdatedExecType() {
+ List updatedContainers = new ArrayList<>();
+ if (oppContainerContext == null) {
+ return updatedContainers;
+ }
+ try {
+ writeLock.lock();
+ for (Map newlyUpdatedContainers :
+ Arrays.asList(newlyPromotedContainers, newlyDemotedContainers)) {
+ Iterator> i =
+ newlyUpdatedContainers.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry entry = i.next();
+ ContainerId matchedContainerId = entry.getKey();
+ RMContainer rmContainer = entry.getValue();
+
+ // swap containers
+ RMContainer existingRMContainer = swapContainer(
+ rmContainer, matchedContainerId);
+ getUpdateContext().removeFromOutstandingUpdate(
+ rmContainer.getAllocatedSchedulerKey(),
+ existingRMContainer.getContainer());
+ Container updatedContainer = updateContainerAndNMToken(
+ existingRMContainer, ContainerUpdateType.UPDATE_EXECUTION_TYPE);
+ updatedContainers.add(updatedContainer);
+
+ // Mark container for release (set RRs to null, so RM does not think
+ // it is a recoverable container)
+ ((RMContainerImpl) rmContainer).setResourceRequests(null);
+ tempContainerToKill.add(rmContainer);
+ i.remove();
+ }
+ }
+ // Release all temporary containers
+ Iterator tempIter = tempContainerToKill.iterator();
+ while (tempIter.hasNext()) {
+ RMContainer c = tempIter.next();
+ ((AbstractYarnScheduler) rmContext.getScheduler()).completedContainer(c,
+ SchedulerUtils.createAbnormalContainerStatus(c.getContainerId(),
+ SchedulerUtils.UPDATED_CONTAINER),
+ RMContainerEventType.KILL);
+ tempIter.remove();
+ }
+ return updatedContainers;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private RMContainer swapContainer(RMContainer rmContainer, ContainerId
+ matchedContainerId) {
+ RMContainer existingRMContainer =
+ getRMContainer(matchedContainerId);
+ if (existingRMContainer != null) {
+ // Swap updated container with the existing container
+ Container updatedContainer = rmContainer.getContainer();
+
+ Container newContainer = Container.newInstance(matchedContainerId,
+ existingRMContainer.getContainer().getNodeId(),
+ existingRMContainer.getContainer().getNodeHttpAddress(),
+ updatedContainer.getResource(),
+ existingRMContainer.getContainer().getPriority(), null,
+ updatedContainer.getExecutionType());
+ newContainer.setAllocationRequestId(
+ existingRMContainer.getContainer().getAllocationRequestId());
+ newContainer.setVersion(existingRMContainer.getContainer().getVersion());
+
+ rmContainer.getContainer().setResource(
+ existingRMContainer.getContainer().getResource());
+ rmContainer.getContainer().setExecutionType(
+ existingRMContainer.getContainer().getExecutionType());
+
+ ((RMContainerImpl)existingRMContainer).setContainer(newContainer);
+ }
+ return existingRMContainer;
+ }
+
private List pullNewlyUpdatedContainers(
Map updatedContainerMap, boolean increase) {
try {
@@ -728,7 +852,8 @@ private void updateNMToken(Container container) {
while (i.hasNext()) {
RMContainer rmContainer = i.next().getValue();
Container updatedContainer = updateContainerAndNMToken(rmContainer,
- false, increase);
+ increase ? ContainerUpdateType.INCREASE_RESOURCE :
+ ContainerUpdateType.DECREASE_RESOURCE);
if (updatedContainer != null) {
returnContainerList.add(updatedContainer);
i.remove();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index b227523..f5539fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -57,6 +57,9 @@
public static final String RELEASED_CONTAINER =
"Container released by application";
+
+ public static final String UPDATED_CONTAINER =
+ "Temporary container killed by application for ExeutionType update";
public static final String LOST_CONTAINER =
"Container released on a *lost* node";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index ea1ae60..1e8fccf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -43,7 +44,6 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
-import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.exceptions.YarnException;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 1eb48bb..eeb0815 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -746,7 +746,7 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult,
// When reserving container
RMContainer updatedContainer = reservedContainer;
if (updatedContainer == null) {
- updatedContainer = new RMContainerImpl(container,
+ updatedContainer = new RMContainerImpl(container, schedulerKey,
application.getApplicationAttemptId(), node.getNodeID(),
application.getAppSchedulingInfo().getUser(), rmContext);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index b14bc20..7d4dc4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -222,7 +222,7 @@ public RMContainer allocate(FiCaSchedulerNode node,
}
// Create RMContainer
- RMContainer rmContainer = new RMContainerImpl(container,
+ RMContainer rmContainer = new RMContainerImpl(container, schedulerKey,
this.getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), this.rmContext,
request.getNodeLabelExpression());
@@ -554,7 +554,7 @@ public void apply(Resource cluster,
// Update this application for the allocated container
if (!allocation.isIncreasedAllocation()) {
// Allocate a new container
- newlyAllocatedContainers.add(rmContainer);
+ addToNewlyAllocatedContainers(rmContainer);
liveContainers.put(containerId, rmContainer);
// Deduct pending resource requests
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index a9591a5..7c69249 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -448,13 +448,13 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node,
}
// Create RMContainer
- rmContainer = new RMContainerImpl(container,
+ rmContainer = new RMContainerImpl(container, schedulerKey,
getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), rmContext);
((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
// Add it to allContainers list.
- newlyAllocatedContainers.add(rmContainer);
+ addToNewlyAllocatedContainers(rmContainer);
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
index e60f70e..af5efc9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
@@ -68,7 +68,7 @@ public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container,
- this.getApplicationAttemptId(), node.getNodeID(),
+ schedulerKey, this.getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), this.rmContext,
request.getNodeLabelExpression());
((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
@@ -76,7 +76,7 @@ public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
updateAMContainerDiagnostics(AMState.ASSIGNED, null);
// Add it to allContainers list.
- newlyAllocatedContainers.add(rmContainer);
+ addToNewlyAllocatedContainers(rmContainer);
ContainerId containerId = container.getId();
liveContainers.put(containerId, rmContainer);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
index 9dbf024..a17a384 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
@@ -177,11 +177,16 @@ private void decrementOutstanding(ResourceRequest offSwitchRequest) {
offSwitchRequest.getCapability());
}
- private ResourceRequest cloneResourceRequest(ResourceRequest request) {
- ResourceRequest newRequest =
- ResourceRequest.newInstance(request.getPriority(),
- request.getResourceName(), request.getCapability(), 1,
- request.getRelaxLocality(), request.getNodeLabelExpression());
+ public ResourceRequest cloneResourceRequest(ResourceRequest request) {
+ ResourceRequest newRequest = ResourceRequest.newBuilder()
+ .priority(request.getPriority())
+ .allocationRequestId(request.getAllocationRequestId())
+ .resourceName(request.getResourceName())
+ .capability(request.getCapability())
+ .numContainers(1)
+ .relaxLocality(request.getRelaxLocality())
+ .nodeLabelExpression(request.getNodeLabelExpression()).build();
+ newRequest.setContainerToUpdate(request.getContainerToUpdate());
return newRequest;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index 593de08..fbeca7b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -251,6 +251,13 @@ public AllocateResponse sendContainerResizingRequest(
return allocate(req);
}
+ public AllocateResponse sendContainerUpdateRequest(
+ List updateRequests) throws Exception {
+ final AllocateRequest req = AllocateRequest.newInstance(0, 0F, null, null,
+ null, updateRequests);
+ return allocate(req);
+ }
+
public AllocateResponse allocate(AllocateRequest allocateRequest)
throws Exception {
UserGroupInformation ugi =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 32cdb1b..2d76127 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -195,6 +195,12 @@ public NodeHeartbeatResponse nodeHeartbeat(Map updatedStats, boolean isHealthy) throws Exception {
+ return nodeHeartbeat(updatedStats, Collections.emptyList(),
+ isHealthy, ++responseId);
+ }
+
public NodeHeartbeatResponse nodeHeartbeat(List updatedStats,
List increasedConts, boolean isHealthy, int resId)
throws Exception {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index cb57f39..c135384 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -142,6 +142,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
@@ -1072,7 +1074,8 @@ public ApplicationReport createAndGetApplicationReport(
Container container = Container.newInstance(
ContainerId.newContainerId(attemptId, 1), null, "", null, null, null);
RMContainerImpl containerimpl = spy(new RMContainerImpl(container,
- attemptId, null, "", rmContext));
+ SchedulerRequestKey.extractFrom(container), attemptId, null, "",
+ rmContext));
Map attempts =
new HashMap();
attempts.put(attemptId, rmAppAttemptImpl);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index 73d9e5c..1fcfdee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -34,11 +34,15 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -64,8 +68,11 @@
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@@ -75,13 +82,17 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
/**
@@ -91,8 +102,10 @@
private static final int GB = 1024;
- @Test(timeout = 60000)
- public void testNodeRemovalDuringAllocate() throws Exception {
+ private MockRM rm;
+
+ @Before
+ public void createAndStartRM() {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
YarnConfiguration conf = new YarnConfiguration(csConf);
@@ -102,8 +115,445 @@ public void testNodeRemovalDuringAllocate() throws Exception {
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setInt(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
- MockRM rm = new MockRM(conf);
+ rm = new MockRM(conf);
rm.start();
+ }
+
+ @After
+ public void stopRM() {
+ if (rm != null) {
+ rm.stop();
+ }
+ }
+
+ @Test(timeout = 600000)
+ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception {
+ HashMap nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h1:4321", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ MockNM nm3 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm3.getNodeId(), nm3);
+ MockNM nm4 = new MockNM("h2:4321", 4096, rm.getResourceTrackerService());
+ nodes.put(nm4.getNodeId(), nm4);
+ nm1.registerNode();
+ nm2.registerNode();
+ nm3.registerNode();
+ nm4.registerNode();
+
+ OpportunisticContainerAllocatorAMService amservice =
+ (OpportunisticContainerAllocatorAMService) rm
+ .getApplicationMasterService();
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ ApplicationAttemptId attemptId =
+ app1.getCurrentAppAttempt().getAppAttemptId();
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+ RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+ RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
+ RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId());
+
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ nm4.nodeHeartbeat(true);
+
+ ((RMNodeImpl) rmNode1)
+ .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+ ((RMNodeImpl) rmNode2)
+ .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+ ((RMNodeImpl) rmNode3)
+ .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+ ((RMNodeImpl) rmNode4)
+ .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+
+ OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
+ .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
+ // Send add and update node events to AM Service.
+ amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+ amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
+ amservice.handle(new NodeAddedSchedulerEvent(rmNode3));
+ amservice.handle(new NodeAddedSchedulerEvent(rmNode4));
+ amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ amservice.handle(new NodeUpdateSchedulerEvent(rmNode3));
+ amservice.handle(new NodeUpdateSchedulerEvent(rmNode4));
+ // All nodes 1 - 4 will be applicable for scheduling.
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ nm4.nodeHeartbeat(true);
+
+ Thread.sleep(1000);
+
+ QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
+ .getMetrics();
+
+ // Verify Metrics
+ verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
+
+ AllocateResponse allocateResponse = am1.allocate(
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+ "*", Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true))),
+ null);
+ List allocatedContainers = allocateResponse
+ .getAllocatedContainers();
+ Assert.assertEquals(2, allocatedContainers.size());
+ Container container = allocatedContainers.get(0);
+ MockNM allocNode = nodes.get(container.getNodeId());
+ MockNM sameHostDiffNode = null;
+ for (NodeId n : nodes.keySet()) {
+ if (n.getHost().equals(allocNode.getNodeId().getHost()) &&
+ n.getPort() != allocNode.getNodeId().getPort()) {
+ sameHostDiffNode = nodes.get(n);
+ }
+ }
+
+ // Verify Metrics After OPP allocation (Nothing should change)
+ verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
+
+ am1.sendContainerUpdateRequest(
+ Arrays.asList(UpdateContainerRequest.newInstance(0,
+ container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED)));
+ // Node on same host should not result in allocation
+ sameHostDiffNode.nodeHeartbeat(true);
+ Thread.sleep(200);
+ allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
+ Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
+
+ // Verify Metrics After OPP allocation (Nothing should change again)
+ verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
+
+ // Send Promotion req again... this should result in update error
+ allocateResponse = am1.sendContainerUpdateRequest(
+ Arrays.asList(UpdateContainerRequest.newInstance(0,
+ container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED)));
+ Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
+ Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
+ Assert.assertEquals("UPDATE_OUTSTANDING_ERROR",
+ allocateResponse.getUpdateErrors().get(0).getReason());
+ Assert.assertEquals(container.getId(),
+ allocateResponse.getUpdateErrors().get(0)
+ .getUpdateContainerRequest().getContainerId());
+
+ // Send Promotion req again with incorrect version...
+ // this should also result in update error
+ allocateResponse = am1.sendContainerUpdateRequest(
+ Arrays.asList(UpdateContainerRequest.newInstance(1,
+ container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED)));
+
+ Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
+ Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
+ Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|1|0",
+ allocateResponse.getUpdateErrors().get(0).getReason());
+ Assert.assertEquals(container.getId(),
+ allocateResponse.getUpdateErrors().get(0)
+ .getUpdateContainerRequest().getContainerId());
+
+ // Ensure after correct node heartbeats, we should get the allocation
+ allocNode.nodeHeartbeat(true);
+ Thread.sleep(200);
+ allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
+ Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+ Container uc =
+ allocateResponse.getUpdatedContainers().get(0).getContainer();
+ Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType());
+ Assert.assertEquals(uc.getId(), container.getId());
+ Assert.assertEquals(uc.getVersion(), container.getVersion() + 1);
+
+ // Verify Metrics After OPP allocation :
+ // Allocated cores+mem should have increased, available should decrease
+ verifyMetrics(metrics, 14336, 14, 2048, 2, 2);
+
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ nm4.nodeHeartbeat(true);
+ Thread.sleep(200);
+
+ // Verify that the container is still in ACQUIRED state wrt the RM.
+ RMContainer rmContainer = ((CapacityScheduler) scheduler)
+ .getApplicationAttempt(
+ uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId());
+ Assert.assertEquals(RMContainerState.ACQUIRED, rmContainer.getState());
+
+ // Now demote the container back..
+ allocateResponse = am1.sendContainerUpdateRequest(
+ Arrays.asList(UpdateContainerRequest.newInstance(uc.getVersion(),
+ uc.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE,
+ null, ExecutionType.OPPORTUNISTIC)));
+ // This should happen in the same heartbeat..
+ Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+ uc = allocateResponse.getUpdatedContainers().get(0).getContainer();
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC, uc.getExecutionType());
+ Assert.assertEquals(uc.getId(), container.getId());
+ Assert.assertEquals(uc.getVersion(), container.getVersion() + 2);
+
+ // Verify Metrics After OPP allocation :
+ // Everything should have reverted to what it was
+ verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
+ }
+
+ @Test(timeout = 60000)
+ public void testContainerPromoteAfterContainerStart() throws Exception {
+ HashMap nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ nm1.registerNode();
+ nm2.registerNode();
+
+ OpportunisticContainerAllocatorAMService amservice =
+ (OpportunisticContainerAllocatorAMService) rm
+ .getApplicationMasterService();
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ ApplicationAttemptId attemptId =
+ app1.getCurrentAppAttempt().getAppAttemptId();
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+ RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+
+ ((RMNodeImpl) rmNode1)
+ .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+ ((RMNodeImpl) rmNode2)
+ .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+
+ OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
+ .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
+ // Send add and update node events to AM Service.
+ amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+ amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
+ amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ // All nodes 1 to 2 will be applicable for scheduling.
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+
+ Thread.sleep(1000);
+
+ QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
+ .getMetrics();
+
+ // Verify Metrics
+ verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
+
+ AllocateResponse allocateResponse = am1.allocate(
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+ "*", Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true))),
+ null);
+ List allocatedContainers = allocateResponse
+ .getAllocatedContainers();
+ Assert.assertEquals(2, allocatedContainers.size());
+ Container container = allocatedContainers.get(0);
+ MockNM allocNode = nodes.get(container.getNodeId());
+
+ // Start Container in NM
+ allocNode.nodeHeartbeat(Arrays.asList(
+ ContainerStatus.newInstance(container.getId(),
+ ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
+ true);
+ Thread.sleep(200);
+
+ // Verify that container is actually running wrt the RM..
+ RMContainer rmContainer = ((CapacityScheduler) scheduler)
+ .getApplicationAttempt(
+ container.getId().getApplicationAttemptId()).getRMContainer(
+ container.getId());
+ Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+
+ // Verify Metrics After OPP allocation (Nothing should change)
+ verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
+
+ am1.sendContainerUpdateRequest(
+ Arrays.asList(UpdateContainerRequest.newInstance(0,
+ container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED)));
+
+ // Verify Metrics After OPP allocation (Nothing should change again)
+ verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
+
+ // Send Promotion req again... this should result in update error
+ allocateResponse = am1.sendContainerUpdateRequest(
+ Arrays.asList(UpdateContainerRequest.newInstance(0,
+ container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED)));
+ Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
+ Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
+ Assert.assertEquals("UPDATE_OUTSTANDING_ERROR",
+ allocateResponse.getUpdateErrors().get(0).getReason());
+ Assert.assertEquals(container.getId(),
+ allocateResponse.getUpdateErrors().get(0)
+ .getUpdateContainerRequest().getContainerId());
+
+ // Start Container in NM
+ allocNode.nodeHeartbeat(Arrays.asList(
+ ContainerStatus.newInstance(container.getId(),
+ ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
+ true);
+ Thread.sleep(200);
+
+ allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
+ Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+ Container uc =
+ allocateResponse.getUpdatedContainers().get(0).getContainer();
+ Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType());
+ Assert.assertEquals(uc.getId(), container.getId());
+ Assert.assertEquals(uc.getVersion(), container.getVersion() + 1);
+
+ // Verify that the Container is still in RUNNING state wrt RM..
+ rmContainer = ((CapacityScheduler) scheduler)
+ .getApplicationAttempt(
+ uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId());
+ Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+
+ // Verify Metrics After OPP allocation :
+ // Allocated cores+mem should have increased, available should decrease
+ verifyMetrics(metrics, 6144, 6, 2048, 2, 2);
+ }
+
+ @Test(timeout = 600000)
+ public void testContainerPromoteAfterContainerComplete() throws Exception {
+ HashMap nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ nm1.registerNode();
+ nm2.registerNode();
+
+ OpportunisticContainerAllocatorAMService amservice =
+ (OpportunisticContainerAllocatorAMService) rm
+ .getApplicationMasterService();
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ ApplicationAttemptId attemptId =
+ app1.getCurrentAppAttempt().getAppAttemptId();
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+ RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+
+ ((RMNodeImpl) rmNode1)
+ .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+ ((RMNodeImpl) rmNode2)
+ .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+
+ OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
+ .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
+ // Send add and update node events to AM Service.
+ amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+ amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
+ amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ // All nodes 1 to 2 will be applicable for scheduling.
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+
+ Thread.sleep(1000);
+
+ QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
+ .getMetrics();
+
+ // Verify Metrics
+ verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
+
+ AllocateResponse allocateResponse = am1.allocate(
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+ "*", Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true))),
+ null);
+ List allocatedContainers = allocateResponse
+ .getAllocatedContainers();
+ Assert.assertEquals(2, allocatedContainers.size());
+ Container container = allocatedContainers.get(0);
+ MockNM allocNode = nodes.get(container.getNodeId());
+
+ // Start Container in NM
+ allocNode.nodeHeartbeat(Arrays.asList(
+ ContainerStatus.newInstance(container.getId(),
+ ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
+ true);
+ Thread.sleep(200);
+
+ // Verify that container is actually running wrt the RM..
+ RMContainer rmContainer = ((CapacityScheduler) scheduler)
+ .getApplicationAttempt(
+ container.getId().getApplicationAttemptId()).getRMContainer(
+ container.getId());
+ Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+
+ // Container Completed in the NM
+ allocNode.nodeHeartbeat(Arrays.asList(
+ ContainerStatus.newInstance(container.getId(),
+ ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)),
+ true);
+ Thread.sleep(200);
+
+ // Verify that container has been removed..
+ rmContainer = ((CapacityScheduler) scheduler)
+ .getApplicationAttempt(
+ container.getId().getApplicationAttemptId()).getRMContainer(
+ container.getId());
+ Assert.assertNull(rmContainer);
+
+ // Verify Metrics After OPP allocation (Nothing should change)
+ verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
+
+ // Send Promotion req... this should result in update error
+ // Since the container doesn't exist anymore..
+ allocateResponse = am1.sendContainerUpdateRequest(
+ Arrays.asList(UpdateContainerRequest.newInstance(0,
+ container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE,
+ null, ExecutionType.GUARANTEED)));
+
+ Assert.assertEquals(1,
+ allocateResponse.getCompletedContainersStatuses().size());
+ Assert.assertEquals(container.getId(),
+ allocateResponse.getCompletedContainersStatuses().get(0)
+ .getContainerId());
+ Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
+ Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
+ Assert.assertEquals("INVALID_CONTAINER_ID",
+ allocateResponse.getUpdateErrors().get(0).getReason());
+ Assert.assertEquals(container.getId(),
+ allocateResponse.getUpdateErrors().get(0)
+ .getUpdateContainerRequest().getContainerId());
+
+ // Verify Metrics After OPP allocation (Nothing should change again)
+ verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
+ }
+
+ private void verifyMetrics(QueueMetrics metrics, long availableMB,
+ int availableVirtualCores, long allocatedMB,
+ int allocatedVirtualCores, int allocatedContainers) {
+ Assert.assertEquals(availableMB, metrics.getAvailableMB());
+ Assert.assertEquals(availableVirtualCores, metrics.getAvailableVirtualCores());
+ Assert.assertEquals(allocatedMB, metrics.getAllocatedMB());
+ Assert.assertEquals(allocatedVirtualCores, metrics.getAllocatedVirtualCores());
+ Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers());
+ }
+
+ @Test(timeout = 60000)
+ public void testNodeRemovalDuringAllocate() throws Exception {
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nm1.registerNode();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index e737a84..893f802 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -114,7 +115,8 @@ public void testReleaseWhileRunning() {
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
true);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
- RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+ RMContainer rmContainer = new RMContainerImpl(container,
+ SchedulerRequestKey.extractFrom(container), appAttemptId,
nodeId, "user", rmContext);
assertEquals(RMContainerState.NEW, rmContainer.getState());
@@ -216,7 +218,8 @@ public void testExpireWhileRunning() {
when(rmContext.getYarnConfiguration()).thenReturn(conf);
when(rmContext.getRMApps()).thenReturn(appMap);
- RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+ RMContainer rmContainer = new RMContainerImpl(container,
+ SchedulerRequestKey.extractFrom(container), appAttemptId,
nodeId, "user", rmContext);
assertEquals(RMContainerState.NEW, rmContainer.getState());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index a6ae0c2..cf91841 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -55,6 +55,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -280,7 +282,8 @@ public void testSortedQueues() throws Exception {
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
Container container=TestUtils.getMockContainer(containerId,
node_0.getNodeID(), Resources.createResource(1*GB), priority);
- RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+ RMContainer rmContainer = new RMContainerImpl(container,
+ SchedulerRequestKey.extractFrom(container), appAttemptId,
node_0.getNodeID(), "user", rmContext);
// Assign {1,2,3,4} 1GB containers respectively to queues
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index c9eb8b3..f9bf89d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -923,13 +923,15 @@ public void testGetAppToUnreserve() throws Exception {
Container container = TestUtils.getMockContainer(containerId,
node_1.getNodeID(), Resources.createResource(2*GB),
priorityMap.getPriority());
- RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+ RMContainer rmContainer = new RMContainerImpl(container,
+ SchedulerRequestKey.extractFrom(container), appAttemptId,
node_1.getNodeID(), "user", rmContext);
Container container_1 = TestUtils.getMockContainer(containerId,
node_0.getNodeID(), Resources.createResource(1*GB),
priorityMap.getPriority());
- RMContainer rmContainer_1 = new RMContainerImpl(container_1, appAttemptId,
+ RMContainer rmContainer_1 = new RMContainerImpl(container_1,
+ SchedulerRequestKey.extractFrom(container_1), appAttemptId,
node_0.getNodeID(), "user", rmContext);
// no reserved containers
@@ -996,7 +998,8 @@ public void testFindNodeToUnreserve() throws Exception {
Container container = TestUtils.getMockContainer(containerId,
node_1.getNodeID(), Resources.createResource(2*GB),
priorityMap.getPriority());
- RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+ RMContainer rmContainer = new RMContainerImpl(container,
+ SchedulerRequestKey.extractFrom(container), appAttemptId,
node_1.getNodeID(), "user", rmContext);
// nothing reserved