diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index 7030712..f4579ab 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -462,7 +463,8 @@ private void addResourceRequest(Priority priority, String resourceName,
remoteRequest.setCapability(capability);
remoteRequest.setNumContainers(0);
remoteRequest.setNodeLabelExpression(nodeLabelExpression);
- remoteRequest.setExecutionType(executionType);
+ remoteRequest.setExecutionTypeRequest(
+ ExecutionTypeRequest.newInstance(executionType, true));
reqMap.put(capability, remoteRequest);
}
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java
new file mode 100644
index 0000000..f553a44
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * An object of this class represents a specification of the execution
+ * guarantee of the Containers associated with a ResourceRequest. It consists
+ * of an ExecutionType as well as flag that explicitly asks the
+ * configuredScheduler to return Containers of exactly the Execution Type
+ * requested.
+ */
+@Public
+@Evolving
+public abstract class ExecutionTypeRequest {
+
+ @Public
+ @Evolving
+ public static ExecutionTypeRequest newInstance() {
+ return newInstance(ExecutionType.GUARANTEED, false);
+ }
+
+ @Public
+ @Evolving
+ public static ExecutionTypeRequest newInstance(ExecutionType execType,
+ boolean ensureExecutionType) {
+ ExecutionTypeRequest executionTypeRequest =
+ Records.newRecord(ExecutionTypeRequest.class);
+ executionTypeRequest.setExecutionType(execType);
+ executionTypeRequest.setEnforceExecutionType(ensureExecutionType);
+ return executionTypeRequest;
+ }
+
+ /**
+ * Set the ExecutionType of the requested container.
+ *
+ * @param execType
+ * ExecutionType of the requested container
+ */
+ @Public
+ public abstract void setExecutionType(ExecutionType execType);
+
+ /**
+ * Get ExecutionType.
+ *
+ * @return ExecutionType.
+ */
+ @Public
+ public abstract ExecutionType getExecutionType();
+
+ /**
+ * Set to true to explicitly ask that the Scheduling Authority return
+ * Containers of exactly the Execution Type requested.
+ * @param enforceExecutionType whether ExecutionType request should be
+ * strictly honored.
+ */
+ @Public
+ public abstract void setEnforceExecutionType(boolean enforceExecutionType);
+
+
+ /**
+ * Get whether Scheduling Authority should return Containers of exactly the
+ * Execution Type requested for this ResourceRequest.
+ * Defaults to false.
+ * @return whether ExecutionType request should be strictly honored
+ */
+ @Public
+ public abstract boolean getEnforceExecutionType();
+
+ @Override
+ public int hashCode() {
+ final int prime = 2153;
+ int result = 2459;
+ ExecutionType executionType = getExecutionType();
+ boolean ensureExecutionType = getEnforceExecutionType();
+ result = prime * result + ((executionType == null) ? 0 :
+ executionType.hashCode());
+ result = prime * result + (ensureExecutionType ? 0 : 1);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ ExecutionTypeRequest other = (ExecutionTypeRequest) obj;
+ ExecutionType executionType = getExecutionType();
+ if (executionType == null) {
+ if (other.getExecutionType() != null) {
+ return false;
+ }
+ } else if (executionType != other.getExecutionType()) {
+ return false;
+ }
+ boolean enforceExecutionType = getEnforceExecutionType();
+ return enforceExecutionType == other.getEnforceExecutionType();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index 8c1fd8d..fbe7e58 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -80,14 +80,14 @@ public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality,
String labelExpression) {
return newInstance(priority, hostName, capability, numContainers,
- relaxLocality, labelExpression, ExecutionType.GUARANTEED);
+ relaxLocality, labelExpression, ExecutionTypeRequest.newInstance());
}
@Public
- @Stable
+ @Evolving
public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality, String
- labelExpression, ExecutionType execType) {
+ labelExpression, ExecutionTypeRequest executionTypeRequest) {
ResourceRequest request = Records.newRecord(ResourceRequest.class);
request.setPriority(priority);
request.setResourceName(hostName);
@@ -95,7 +95,7 @@ public static ResourceRequest newInstance(Priority priority, String hostName,
request.setNumContainers(numContainers);
request.setRelaxLocality(relaxLocality);
request.setNodeLabelExpression(labelExpression);
- request.setExecutionType(execType);
+ request.setExecutionTypeRequest(executionTypeRequest);
return request;
}
@@ -233,14 +233,16 @@ public static boolean isAnyLocation(String hostName) {
public abstract boolean getRelaxLocality();
/**
- * Set the ExecutionType of the requested container.
+ * Set the ExecutionTypeRequest of the requested container.
*
- * @param execType
- * ExecutionType of the requested container
+ * @param execSpec
+ * ExecutionTypeRequest of the requested container
*/
@Public
- @Stable
- public abstract void setExecutionType(ExecutionType execType);
+ @Evolving
+ public void setExecutionTypeRequest(ExecutionTypeRequest execSpec) {
+ throw new UnsupportedOperationException();
+ }
/**
* Get whether locality relaxation is enabled with this
@@ -250,8 +252,10 @@ public static boolean isAnyLocation(String hostName) {
* ResourceRequest.
*/
@Public
- @Stable
- public abstract ExecutionType getExecutionType();
+ @Evolving
+ public ExecutionTypeRequest getExecutionTypeRequest() {
+ throw new UnsupportedOperationException();
+ }
/**
*
For a request at a network hierarchy level, set whether locality can be relaxed
@@ -353,12 +357,12 @@ public boolean equals(Object obj) {
return false;
} else if (!priority.equals(other.getPriority()))
return false;
- ExecutionType executionType = getExecutionType();
- if (executionType == null) {
- if (other.getExecutionType() != null) {
+ ExecutionTypeRequest execTypeRequest = getExecutionTypeRequest();
+ if (execTypeRequest == null) {
+ if (other.getExecutionTypeRequest() != null) {
return false;
}
- } else if (executionType != other.getExecutionType()) {
+ } else if (!execTypeRequest.equals(other.getExecutionTypeRequest())) {
return false;
}
if (getNodeLabelExpression() == null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 42b5410..ca33b28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -305,7 +305,12 @@ message ResourceRequestProto {
optional int32 num_containers = 4;
optional bool relax_locality = 5 [default = true];
optional string node_label_expression = 6;
- optional ExecutionTypeProto executionType = 7 [default = GUARANTEED];
+ optional ExecutionTypeRequestProto execution_type_request = 7;
+}
+
+message ExecutionTypeRequestProto {
+ optional ExecutionTypeProto execution_type = 1 [default = GUARANTEED];
+ optional bool enforce_execution_type = 2 [default = false];
}
enum AMCommandProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 3ec0899..035b1bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -29,12 +30,14 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -66,6 +69,14 @@
return client;
}
+ @VisibleForTesting
+ @Public
+ public static AMRMClient createAMRMClient(
+ ApplicationMasterProtocol protocol) {
+ AMRMClient client = new AMRMClientImpl(protocol);
+ return client;
+ }
+
private NMTokenCache nmTokenCache;
@Private
@@ -109,7 +120,7 @@ protected AMRMClient(String name) {
final Priority priority;
final boolean relaxLocality;
final String nodeLabelsExpression;
- final ExecutionType executionType;
+ final ExecutionTypeRequest executionTypeRequest;
/**
* Instantiates a {@link ContainerRequest} with the given constraints and
@@ -180,7 +191,7 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks,
Priority priority, boolean relaxLocality, String nodeLabelsExpression) {
this(capability, nodes, racks, priority, relaxLocality,
nodeLabelsExpression,
- ExecutionType.GUARANTEED);
+ ExecutionTypeRequest.newInstance());
}
/**
@@ -203,12 +214,12 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks,
* @param nodeLabelsExpression
* Set node labels to allocate resource, now we only support
* asking for only a single node label
- * @param executionType
+ * @param executionTypeRequest
* Set the execution type of the container request.
*/
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
Priority priority, boolean relaxLocality, String nodeLabelsExpression,
- ExecutionType executionType) {
+ ExecutionTypeRequest executionTypeRequest) {
// Validate request
Preconditions.checkArgument(capability != null,
"The Resource to be requested for each container " +
@@ -226,7 +237,7 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks,
this.priority = priority;
this.relaxLocality = relaxLocality;
this.nodeLabelsExpression = nodeLabelsExpression;
- this.executionType = executionType;
+ this.executionTypeRequest = executionTypeRequest;
}
public Resource getCapability() {
@@ -253,15 +264,16 @@ public String getNodeLabelExpression() {
return nodeLabelsExpression;
}
- public ExecutionType getExecutionType() {
- return executionType;
+ public ExecutionTypeRequest getExecutionTypeRequest() {
+ return executionTypeRequest;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Capability[").append(capability).append("]");
sb.append("Priority[").append(priority).append("]");
- sb.append("ExecutionType[").append(executionType).append("]");
+ sb.append("ExecutionTypeRequest[").append(executionTypeRequest)
+ .append("]");
return sb.toString();
}
}
@@ -392,6 +404,19 @@ public abstract void requestContainerResourceChange(
Priority priority,
String resourceName,
Resource capability);
+
+ /**
+ * Performs the same function as the above but also allows the user to
+ * specify an ExecutionType .
+ * @param priority Priority
+ * @param resourceName Location
+ * @param executionType ExecutionType
+ * @param capability Capability
+ * @return Collection of request matching the parameters
+ */
+ public abstract List extends Collection> getMatchingRequests(
+ Priority priority, String resourceName, ExecutionType executionType,
+ Resource capability);
/**
* Update application's blacklist with addition or removal resources.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 286ca28..0beb693 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -154,6 +155,13 @@ public void setHeartbeatInterval(int interval) {
Resource capability) {
return client.getMatchingRequests(priority, resourceName, capability);
}
+
+ public List extends Collection> getMatchingRequests(
+ Priority priority, String resourceName, Resource capability,
+ ExecutionType executionType) {
+ return client.getMatchingRequests(priority, resourceName, executionType,
+ capability);
+ }
/**
* Registers this application master with the resource manager. On successful
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 4366c25..26a9929 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -54,6 +54,8 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -157,10 +159,8 @@ static boolean canFit(Resource arg0, Resource arg1) {
//Value->Map
//Key->Resource Capability
//Value->ResourceRequest
- protected final
- Map>>
- remoteRequestsTable =
- new TreeMap>>();
+ protected final Map>>> remoteRequestsTable = new TreeMap<>();
protected final Set ask = new TreeSet(
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
@@ -185,6 +185,11 @@ public AMRMClientImpl() {
super(AMRMClientImpl.class.getName());
}
+ public AMRMClientImpl(ApplicationMasterProtocol protocol) {
+ super(AMRMClientImpl.class.getName());
+ this.rmClient = protocol;
+ }
+
@Override
protected void serviceInit(Configuration conf) throws Exception {
RackResolver.init(conf);
@@ -195,8 +200,10 @@ protected void serviceInit(Configuration conf) throws Exception {
protected void serviceStart() throws Exception {
final YarnConfiguration conf = new YarnConfiguration(getConfig());
try {
- rmClient =
- ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
+ if (rmClient == null) {
+ rmClient = ClientRMProxy.createRMProxy(
+ conf, ApplicationMasterProtocol.class);
+ }
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
@@ -263,7 +270,8 @@ public AllocateResponse allocate(float progressIndicator)
// RPC layer is using it to send info across
askList.add(ResourceRequest.newInstance(r.getPriority(),
r.getResourceName(), r.getCapability(), r.getNumContainers(),
- r.getRelaxLocality(), r.getNodeLabelExpression()));
+ r.getRelaxLocality(), r.getNodeLabelExpression(),
+ r.getExecutionTypeRequest()));
}
List increaseList = new ArrayList<>();
List decreaseList = new ArrayList<>();
@@ -315,11 +323,15 @@ public AllocateResponse allocate(float progressIndicator)
synchronized (this) {
release.addAll(this.pendingRelease);
blacklistAdditions.addAll(this.blacklistedNodes);
- for (Map> rr : remoteRequestsTable
- .values()) {
- for (Map capabalities : rr.values()) {
- for (ResourceRequestInfo request : capabalities.values()) {
- addResourceRequestToAsk(request.remoteRequest);
+ for (Map>> rre : remoteRequestsTable.values()) {
+ for (Map>
+ execRr : rre.values()) {
+ for (Map capabalities : execRr
+ .values()) {
+ for (ResourceRequestInfo request : capabalities.values()) {
+ addResourceRequestToAsk(request.remoteRequest);
+ }
}
}
}
@@ -517,26 +529,27 @@ public synchronized void addContainerRequest(T req) {
+ joiner.join(req.getNodes()));
}
for (String node : dedupedNodes) {
- addResourceRequest(req.getPriority(), node, req.getCapability(), req,
- true, req.getNodeLabelExpression());
+ addResourceRequest(req.getPriority(), node, req.getExecutionTypeRequest(),
+ req.getCapability(), req, true, req.getNodeLabelExpression());
}
}
for (String rack : dedupedRacks) {
- addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
- true, req.getNodeLabelExpression());
+ addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
+ req.getCapability(), req, true, req.getNodeLabelExpression());
}
// Ensure node requests are accompanied by requests for
// corresponding rack
for (String rack : inferredRacks) {
- addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
- req.getRelaxLocality(), req.getNodeLabelExpression());
+ addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
+ req.getCapability(), req, req.getRelaxLocality(),
+ req.getNodeLabelExpression());
}
-
// Off-switch
- addResourceRequest(req.getPriority(), ResourceRequest.ANY,
- req.getCapability(), req, req.getRelaxLocality(), req.getNodeLabelExpression());
+ addResourceRequest(req.getPriority(), ResourceRequest.ANY,
+ req.getExecutionTypeRequest(), req.getCapability(), req,
+ req.getRelaxLocality(), req.getNodeLabelExpression());
}
@Override
@@ -552,16 +565,18 @@ public synchronized void removeContainerRequest(T req) {
// Update resource requests
if (req.getNodes() != null) {
for (String node : new HashSet(req.getNodes())) {
- decResourceRequest(req.getPriority(), node, req.getCapability(), req);
+ decResourceRequest(req.getPriority(), node,
+ req.getExecutionTypeRequest(), req.getCapability(), req);
}
}
for (String rack : allRacks) {
- decResourceRequest(req.getPriority(), rack, req.getCapability(), req);
+ decResourceRequest(req.getPriority(), rack,
+ req.getExecutionTypeRequest(), req.getCapability(), req);
}
decResourceRequest(req.getPriority(), ResourceRequest.ANY,
- req.getCapability(), req);
+ req.getExecutionTypeRequest(), req.getCapability(), req);
}
@Override
@@ -601,47 +616,57 @@ public synchronized Resource getAvailableResources() {
public synchronized int getClusterNodeCount() {
return clusterNodeCount;
}
-
+
@Override
public synchronized List extends Collection> getMatchingRequests(
- Priority priority,
- String resourceName,
- Resource capability) {
+ Priority priority,
+ String resourceName,
+ Resource capability) {
+ return getMatchingRequests(priority, resourceName,
+ ExecutionType.GUARANTEED, capability);
+ }
+
+ @Override
+ public synchronized List extends Collection> getMatchingRequests(
+ Priority priority, String resourceName, ExecutionType executionType,
+ Resource capability) {
Preconditions.checkArgument(capability != null,
"The Resource to be requested should not be null ");
Preconditions.checkArgument(priority != null,
"The priority at which to request containers should not be null ");
List> list = new LinkedList>();
- Map> remoteRequests =
- this.remoteRequestsTable.get(priority);
+ Map>>
+ remoteRequests = this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
return list;
}
- TreeMap reqMap = remoteRequests
- .get(resourceName);
- if (reqMap == null) {
+ Map> reqExecMap =
+ remoteRequests.get(resourceName);
+ if (reqExecMap == null) {
return list;
}
+ TreeMap reqMap =
+ reqExecMap.get(executionType);
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
if (resourceRequestInfo != null &&
!resourceRequestInfo.containerRequests.isEmpty()) {
list.add(resourceRequestInfo.containerRequests);
return list;
}
-
+
// no exact match. Container may be larger than what was requested.
- // get all resources <= capability. map is reverse sorted.
- SortedMap tailMap =
- reqMap.tailMap(capability);
- for(Map.Entry entry : tailMap.entrySet()) {
+ // get all resources <= capability. map is reverse sorted.
+ SortedMap tailMap =
+ reqMap.tailMap(capability);
+ for (Map.Entry entry :
+ tailMap.entrySet()) {
if (canFit(entry.getKey(), capability) &&
!entry.getValue().containerRequests.isEmpty()) {
// match found that fits in the larger resource
list.add(entry.getValue().containerRequests);
}
}
-
// no match found
return list;
}
@@ -663,23 +688,28 @@ public synchronized int getClusterNodeCount() {
return racks;
}
-
+
/**
* ContainerRequests with locality relaxation cannot be made at the same
* priority as ContainerRequests without locality relaxation.
*/
private void checkLocalityRelaxationConflict(Priority priority,
Collection locations, boolean relaxLocality) {
- Map> remoteRequests =
- this.remoteRequestsTable.get(priority);
+ Map>>
+ remoteRequests = this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
return;
}
// Locality relaxation will be set to relaxLocality for all implicitly
// requested racks. Make sure that existing rack requests match this.
for (String location : locations) {
- TreeMap reqs =
- remoteRequests.get(location);
+ Map> reqExecMap =
+ remoteRequests.get(location);
+ if (reqExecMap == null) {
+ // Should not happen..
+ continue;
+ }
+ for (TreeMap reqs : reqExecMap.values()) {
if (reqs != null && !reqs.isEmpty()) {
boolean existingRelaxLocality =
reqs.values().iterator().next().remoteRequest.getRelaxLocality();
@@ -687,10 +717,12 @@ private void checkLocalityRelaxationConflict(Priority priority,
throw new InvalidContainerRequestException("Cannot submit a "
+ "ContainerRequest asking for location " + location
+ " with locality relaxation " + relaxLocality + " when it has "
- + "already been requested with locality relaxation " + existingRelaxLocality);
+ + "already been requested with locality relaxation " +
+ existingRelaxLocality);
}
}
}
+ }
}
/**
@@ -747,27 +779,32 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
ask.add(remoteRequest);
}
- private void
- addResourceRequest(Priority priority, String resourceName,
- Resource capability, T req, boolean relaxLocality,
- String labelExpression) {
- Map> remoteRequests =
- this.remoteRequestsTable.get(priority);
+ private void addResourceRequest(Priority priority, String resourceName,
+ ExecutionTypeRequest execTypeReq, Resource capability, T req,
+ boolean relaxLocality, String labelExpression) {
+ Map>>
+ remoteRequests = this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
- remoteRequests =
- new HashMap>();
+ remoteRequests = new HashMap<>();
this.remoteRequestsTable.put(priority, remoteRequests);
if (LOG.isDebugEnabled()) {
LOG.debug("Added priority=" + priority);
}
}
- TreeMap reqMap =
- remoteRequests.get(resourceName);
+ Map> reqExecMap =
+ remoteRequests.get(resourceName);
+ if (reqExecMap == null) {
+ reqExecMap = new HashMap<>();
+ remoteRequests.put(resourceName, reqExecMap);
+ }
+
+ TreeMap reqMap =
+ reqExecMap.get(execTypeReq.getExecutionType());
if (reqMap == null) {
// capabilities are stored in reverse sorted order. smallest last.
reqMap = new TreeMap(
new ResourceReverseMemoryThenCpuComparator());
- remoteRequests.put(resourceName, reqMap);
+ reqExecMap.put(execTypeReq.getExecutionType(), reqMap);
}
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
if (resourceRequestInfo == null) {
@@ -776,7 +813,8 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
relaxLocality);
reqMap.put(capability, resourceRequestInfo);
}
-
+
+ resourceRequestInfo.remoteRequest.setExecutionTypeRequest(execTypeReq);
resourceRequestInfo.remoteRequest.setNumContainers(
resourceRequestInfo.remoteRequest.getNumContainers() + 1);
@@ -800,12 +838,10 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
}
}
- private void decResourceRequest(Priority priority,
- String resourceName,
- Resource capability,
- T req) {
- Map> remoteRequests =
- this.remoteRequestsTable.get(priority);
+ private void decResourceRequest(Priority priority, String resourceName,
+ ExecutionTypeRequest execTypeReq, Resource capability, T req) {
+ Map>>
+ remoteRequests = this.remoteRequestsTable.get(priority);
if(remoteRequests == null) {
if (LOG.isDebugEnabled()) {
@@ -815,14 +851,26 @@ private void decResourceRequest(Priority priority,
return;
}
- Map reqMap = remoteRequests.get(resourceName);
- if (reqMap == null) {
+ Map> reqExecMap =
+ remoteRequests.get(resourceName);
+ if (reqExecMap == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not decrementing resource as " + resourceName
+ " is not present in request table");
}
return;
}
+
+ TreeMap reqMap =
+ reqExecMap.get(execTypeReq.getExecutionType());
+ if (reqMap == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not decrementing resource as " + execTypeReq
+ + " is not present in request table");
+ }
+ return;
+ }
+
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
if (LOG.isDebugEnabled()) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java
new file mode 100644
index 0000000..0b62054
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Base test case to be used for Testing frameworks that use AMRMProxy.
+ */
+public abstract class BaseAMRMProxyE2ETest {
+
+ protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient,
+ ApplicationId appId, MiniYARNCluster cluster,
+ final Configuration yarnConf)
+ throws IOException, InterruptedException, YarnException {
+
+ UserGroupInformation user = null;
+
+ // Get the AMRMToken from AMRMProxy
+
+ ApplicationReport report = rmClient.getApplicationReport(appId);
+
+ user = UserGroupInformation.createProxyUser(
+ report.getCurrentApplicationAttemptId().toString(),
+ UserGroupInformation.getCurrentUser());
+
+ ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster
+ .getNodeManager(0).getNMContext().getContainerManager();
+
+ AMRMProxyTokenSecretManager amrmTokenSecretManager =
+ containerManager.getAMRMProxyService().getSecretManager();
+ org.apache.hadoop.security.token.Token token =
+ amrmTokenSecretManager
+ .createAndGetAMRMToken(report.getCurrentApplicationAttemptId());
+
+ SecurityUtil.setTokenService(token,
+ containerManager.getAMRMProxyService().getBindAddress());
+ user.addToken(token);
+
+ // Start Application Master
+
+ return user
+ .doAs(new PrivilegedExceptionAction() {
+ @Override
+ public ApplicationMasterProtocol run() throws Exception {
+ return ClientRMProxy.createRMProxy(yarnConf,
+ ApplicationMasterProtocol.class);
+ }
+ });
+ }
+
+ protected AllocateRequest createAllocateRequest(List listNode) {
+ // The test needs AMRMClient to create a real allocate request
+ AMRMClientImpl amClient =
+ new AMRMClientImpl<>();
+
+ Resource capability = Resource.newInstance(1024, 2);
+ Priority priority = Priority.newInstance(1);
+ List nodeReports = listNode;
+ String node = nodeReports.get(0).getNodeId().getHost();
+ String[] nodes = new String[] {node};
+
+ AMRMClient.ContainerRequest storedContainer1 =
+ new AMRMClient.ContainerRequest(capability, nodes, null, priority);
+ amClient.addContainerRequest(storedContainer1);
+ amClient.addContainerRequest(storedContainer1);
+
+ List resourceAsk = new ArrayList<>();
+ for (ResourceRequest rr : amClient.ask) {
+ resourceAsk.add(rr);
+ }
+
+ ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest
+ .newInstance(new ArrayList<>(), new ArrayList<>());
+
+ int responseId = 1;
+
+ return AllocateRequest.newInstance(responseId, 0, resourceAsk,
+ new ArrayList<>(), resourceBlacklistRequest);
+ }
+
+ protected ApplicationAttemptId createApp(YarnClient yarnClient,
+ MiniYARNCluster yarnCluster, Configuration conf) throws Exception {
+
+ ApplicationSubmissionContext appContext =
+ yarnClient.createApplication().getApplicationSubmissionContext();
+ ApplicationId appId = appContext.getApplicationId();
+
+ appContext.setApplicationName("Test");
+
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(0);
+ appContext.setPriority(pri);
+
+ appContext.setQueue("default");
+
+ ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
+ Collections. emptyMap(),
+ new HashMap(), Arrays.asList("sleep", "10000"),
+ new HashMap(), null,
+ new HashMap());
+ appContext.setAMContainerSpec(amContainer);
+ appContext.setResource(Resource.newInstance(1024, 1));
+
+ SubmitApplicationRequest appRequest =
+ Records.newRecord(SubmitApplicationRequest.class);
+ appRequest.setApplicationSubmissionContext(appContext);
+
+ yarnClient.submitApplication(appContext);
+
+ RMAppAttempt appAttempt = null;
+ ApplicationAttemptId attemptId = null;
+ while (true) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport
+ .getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+ attemptId =
+ appReport.getCurrentApplicationAttemptId();
+ appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+ .get(attemptId.getApplicationId()).getCurrentAppAttempt();
+ while (true) {
+ if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
+ break;
+ }
+ }
+ break;
+ }
+ }
+ Thread.sleep(1000);
+ // Just dig into the ResourceManager and get the AMRMToken just for the sake
+ // of testing.
+ UserGroupInformation.setLoginUser(UserGroupInformation
+ .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+
+ // emulate RM setup of AMRM token in credentials by adding the token
+ // *before* setting the token service
+ UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+ appAttempt.getAMRMToken().setService(
+ ClientRMProxy.getAMRMTokenService(conf));
+ return attemptId;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 75b49d0..d328518 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -61,6 +61,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -414,10 +415,12 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
// test addition and storage
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
- .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+ .get(ResourceRequest.ANY).get(ExecutionType.GUARANTEED)
+ .get(capability).remoteRequest.getNumContainers();
assertEquals(2, containersRequestedAny);
containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
- .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+ .get(ResourceRequest.ANY).get(ExecutionType.GUARANTEED)
+ .get(capability).remoteRequest.getNumContainers();
assertEquals(1, containersRequestedAny);
List extends Collection> matches =
amClient.getMatchingRequests(priority, node, capability);
@@ -920,11 +923,14 @@ private void testAllocation(final AMRMClientImpl amClient)
new ContainerRequest(capability, nodes, racks, priority));
int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
- .get(node).get(capability).remoteRequest.getNumContainers();
+ .get(node).get(ExecutionType.GUARANTEED).get(capability)
+ .remoteRequest.getNumContainers();
int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
- .get(rack).get(capability).remoteRequest.getNumContainers();
+ .get(rack).get(ExecutionType.GUARANTEED).get(capability)
+ .remoteRequest.getNumContainers();
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
- .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+ .get(ResourceRequest.ANY).get(ExecutionType.GUARANTEED).get(capability)
+ .remoteRequest.getNumContainers();
assertEquals(2, containersRequestedNode);
assertEquals(2, containersRequestedRack);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
index cb8c86a..163c665 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
@@ -26,6 +26,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -35,6 +37,46 @@
import org.junit.Test;
public class TestAMRMClientContainerRequest {
+
+ @Test
+ public void testOpportunisticAndGuaranteedRequests() {
+ AMRMClientImpl client =
+ new AMRMClientImpl();
+
+ Configuration conf = new Configuration();
+ conf.setClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+ client.init(conf);
+
+ Resource capability = Resource.newInstance(1024, 1);
+ ContainerRequest request =
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
+ new String[] {"/rack2"}, Priority.newInstance(1));
+ client.addContainerRequest(request);
+ verifyResourceRequest(client, request, "host1", true);
+ verifyResourceRequest(client, request, "host2", true);
+ verifyResourceRequest(client, request, "/rack1", true);
+ verifyResourceRequest(client, request, "/rack2", true);
+ verifyResourceRequest(client, request, ResourceRequest.ANY, true);
+ ContainerRequest request2 =
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
+ new String[] {"/rack2"}, Priority.newInstance(1), true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true));
+ client.addContainerRequest(request2);
+ verifyResourceRequest(client, request, "host1", true,
+ ExecutionType.OPPORTUNISTIC);
+ verifyResourceRequest(client, request, "host2", true,
+ ExecutionType.OPPORTUNISTIC);
+ verifyResourceRequest(client, request, "/rack1", true,
+ ExecutionType.OPPORTUNISTIC);
+ verifyResourceRequest(client, request, "/rack2", true,
+ ExecutionType.OPPORTUNISTIC);
+ verifyResourceRequest(client, request, ResourceRequest.ANY, true,
+ ExecutionType.OPPORTUNISTIC);
+ }
+
@Test
public void testFillInRacks() {
AMRMClientImpl client =
@@ -224,8 +266,17 @@ public void reloadCachedMappings(List names) {
private void verifyResourceRequest(
AMRMClientImpl client, ContainerRequest request,
String location, boolean expectedRelaxLocality) {
- ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority())
- .get(location).get(request.getCapability()).remoteRequest;
+ verifyResourceRequest(client, request, location, expectedRelaxLocality,
+ ExecutionType.GUARANTEED);
+ }
+
+ private void verifyResourceRequest(
+ AMRMClientImpl client, ContainerRequest request,
+ String location, boolean expectedRelaxLocality,
+ ExecutionType executionType) {
+ ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority())
+ .get(location).get(executionType).get(request.getCapability())
+ .remoteRequest;
assertEquals(location, ask.getResourceName());
assertEquals(1, ask.getNumContainers());
assertEquals(expectedRelaxLocality, ask.getRelaxLocality());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
index f1e3f03..c938bc9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
@@ -19,20 +19,12 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -40,43 +32,22 @@
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
-public class TestAMRMProxy {
+public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
private static final Log LOG = LogFactory.getLog(TestAMRMProxy.class);
@@ -84,7 +55,7 @@
* This test validates register, allocate and finish of an application through
* the AMRMPRoxy.
*/
- @Test(timeout = 60000)
+ @Test(timeout = 120000)
public void testAMRMProxyE2E() throws Exception {
MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E", 1, 1, 1);
YarnClient rmClient = null;
@@ -107,7 +78,8 @@ public void testAMRMProxyE2E() throws Exception {
// Submit application
- ApplicationId appId = createApp(rmClient, cluster);
+ ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
+ ApplicationId appId = appAttmptId.getApplicationId();
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
@@ -173,7 +145,7 @@ public void testAMRMProxyE2E() throws Exception {
* that the received token it is different from the previous one within 5
* requests.
*/
- @Test(timeout = 60000)
+ @Test(timeout = 120000)
public void testE2ETokenRenewal() throws Exception {
MiniYARNCluster cluster =
new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
@@ -201,7 +173,8 @@ public void testE2ETokenRenewal() throws Exception {
// Submit
- ApplicationId appId = createApp(rmClient, cluster);
+ ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
+ ApplicationId appId = appAttmptId.getApplicationId();
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
@@ -252,7 +225,7 @@ public void testE2ETokenRenewal() throws Exception {
* This test validates that an AM cannot register directly to the RM, with the
* token provided by the AMRMProxy.
*/
- @Test(timeout = 60000)
+ @Test(timeout = 120000)
public void testE2ETokenSwap() throws Exception {
MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap", 1, 1, 1);
YarnClient rmClient = null;
@@ -270,7 +243,8 @@ public void testE2ETokenSwap() throws Exception {
rmClient.init(yarnConf);
rmClient.start();
- ApplicationId appId = createApp(rmClient, cluster);
+ ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
+ ApplicationId appId = appAttmptId.getApplicationId();
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
@@ -290,124 +264,4 @@ public void testE2ETokenSwap() throws Exception {
cluster.stop();
}
}
-
- protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient,
- ApplicationId appId, MiniYARNCluster cluster,
- final Configuration yarnConf)
- throws IOException, InterruptedException, YarnException {
-
- UserGroupInformation user = null;
-
- // Get the AMRMToken from AMRMProxy
-
- ApplicationReport report = rmClient.getApplicationReport(appId);
-
- user = UserGroupInformation.createProxyUser(
- report.getCurrentApplicationAttemptId().toString(),
- UserGroupInformation.getCurrentUser());
-
- ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster
- .getNodeManager(0).getNMContext().getContainerManager();
-
- AMRMProxyTokenSecretManager amrmTokenSecretManager =
- containerManager.getAMRMProxyService().getSecretManager();
- org.apache.hadoop.security.token.Token token =
- amrmTokenSecretManager
- .createAndGetAMRMToken(report.getCurrentApplicationAttemptId());
-
- SecurityUtil.setTokenService(token,
- containerManager.getAMRMProxyService().getBindAddress());
- user.addToken(token);
-
- // Start Application Master
-
- return user
- .doAs(new PrivilegedExceptionAction() {
- @Override
- public ApplicationMasterProtocol run() throws Exception {
- return ClientRMProxy.createRMProxy(yarnConf,
- ApplicationMasterProtocol.class);
- }
- });
- }
-
- protected AllocateRequest createAllocateRequest(List listNode) {
- // The test needs AMRMClient to create a real allocate request
- AMRMClientImpl amClient =
- new AMRMClientImpl();
-
- Resource capability = Resource.newInstance(1024, 2);
- Priority priority = Priority.newInstance(1);
- List nodeReports = listNode;
- String node = nodeReports.get(0).getNodeId().getHost();
- String[] nodes = new String[] { node };
-
- ContainerRequest storedContainer1 =
- new ContainerRequest(capability, nodes, null, priority);
- amClient.addContainerRequest(storedContainer1);
- amClient.addContainerRequest(storedContainer1);
-
- List resourceAsk = new ArrayList();
- for (ResourceRequest rr : amClient.ask) {
- resourceAsk.add(rr);
- }
-
- ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest
- .newInstance(new ArrayList(), new ArrayList());
-
- int responseId = 1;
-
- return AllocateRequest.newInstance(responseId, 0, resourceAsk,
- new ArrayList(), resourceBlacklistRequest);
- }
-
- protected ApplicationId createApp(YarnClient yarnClient,
- MiniYARNCluster yarnCluster) throws Exception {
-
- ApplicationSubmissionContext appContext =
- yarnClient.createApplication().getApplicationSubmissionContext();
- ApplicationId appId = appContext.getApplicationId();
-
- appContext.setApplicationName("Test");
-
- Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(0);
- appContext.setPriority(pri);
-
- appContext.setQueue("default");
-
- ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
- Collections. emptyMap(),
- new HashMap(), Arrays.asList("sleep", "10000"),
- new HashMap(), null,
- new HashMap());
- appContext.setAMContainerSpec(amContainer);
- appContext.setResource(Resource.newInstance(1024, 1));
-
- SubmitApplicationRequest appRequest =
- Records.newRecord(SubmitApplicationRequest.class);
- appRequest.setApplicationSubmissionContext(appContext);
-
- yarnClient.submitApplication(appContext);
-
- RMAppAttempt appAttempt = null;
- while (true) {
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
- if (appReport
- .getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
- ApplicationAttemptId attemptId =
- appReport.getCurrentApplicationAttemptId();
- appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
- .get(attemptId.getApplicationId()).getCurrentAppAttempt();
- while (true) {
- if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
- break;
- }
- }
- break;
- }
- }
- Thread.sleep(1000);
- return appId;
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index b4dcf66..af06cd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,19 +22,31 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -42,12 +54,23 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.Ignore;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Validates End2End Distributed Scheduling flow which includes the AM
@@ -56,11 +79,70 @@
* the NM and the DistributedSchedulingProtocol used by the framework to talk
* to the DistributedSchedulingService running on the RM.
*/
-public class TestDistributedScheduling extends TestAMRMProxy {
+public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
private static final Log LOG =
LogFactory.getLog(TestDistributedScheduling.class);
+ protected MiniYARNCluster cluster;
+ protected YarnClient rmClient;
+ protected ApplicationMasterProtocol client;
+ protected Configuration conf;
+ protected Configuration yarnConf;
+ protected ApplicationAttemptId attemptId;
+ protected ApplicationId appId;
+
+ @Before
+ public void doBefore() throws Exception {
+ cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
+
+ conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
+ cluster.init(conf);
+ cluster.start();
+ yarnConf = cluster.getConfig();
+
+ // the client has to connect to AMRMProxy
+ yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
+ rmClient = YarnClient.createYarnClient();
+ rmClient.init(yarnConf);
+ rmClient.start();
+
+ // Submit application
+ attemptId = createApp(rmClient, cluster, conf);
+ appId = attemptId.getApplicationId();
+ client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
+ }
+
+ @After
+ public void doAfter() throws Exception {
+ if (client != null) {
+ try {
+ client.finishApplicationMaster(FinishApplicationMasterRequest
+ .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
+ rmClient.killApplication(attemptId.getApplicationId());
+ attemptId = null;
+ } catch (Exception e) {
+ }
+ }
+ if (rmClient != null) {
+ try {
+ rmClient.stop();
+ } catch (Exception e) {
+ }
+ }
+ if (cluster != null) {
+ try {
+ cluster.stop();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+
/**
* Validates if Allocate Requests containing only OPPORTUNISTIC container
* requests are satisfied instantly.
@@ -69,102 +151,63 @@
*/
@Test(timeout = 60000)
public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
- MiniYARNCluster cluster =
- new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
- YarnClient rmClient = null;
- ApplicationMasterProtocol client;
-
- try {
- Configuration conf = new YarnConfiguration();
- conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
- conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
- conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
- cluster.init(conf);
- cluster.start();
- final Configuration yarnConf = cluster.getConfig();
-
- // the client has to connect to AMRMProxy
-
- yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
- rmClient = YarnClient.createYarnClient();
- rmClient.init(yarnConf);
- rmClient.start();
-
- // Submit application
-
- ApplicationId appId = createApp(rmClient, cluster);
-
- client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
-
- LOG.info("testDistributedSchedulingE2E - Register");
-
- RegisterApplicationMasterResponse responseRegister =
- client.registerApplicationMaster(RegisterApplicationMasterRequest
- .newInstance(NetUtils.getHostname(), 1024, ""));
-
- Assert.assertNotNull(responseRegister);
- Assert.assertNotNull(responseRegister.getQueue());
- Assert.assertNotNull(responseRegister.getApplicationACLs());
- Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
- Assert
- .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
- Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
- Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
- RMApp rmApp =
- cluster.getResourceManager().getRMContext().getRMApps().get(appId);
- Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
- LOG.info("testDistributedSchedulingE2E - Allocate");
-
- AllocateRequest request =
- createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
-
- // Replace 'ANY' requests with OPPORTUNISTIC aks and remove
- // everything else
- List newAskList = new ArrayList<>();
- for (ResourceRequest rr : request.getAskList()) {
- if (ResourceRequest.ANY.equals(rr.getResourceName())) {
- ResourceRequest newRR = ResourceRequest.newInstance(rr
- .getPriority(), rr.getResourceName(),
- rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
- rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC);
- newAskList.add(newRR);
- }
- }
- request.setAskList(newAskList);
-
- AllocateResponse allocResponse = client.allocate(request);
- Assert.assertNotNull(allocResponse);
-
- // Ensure that all the requests are satisfied immediately
- Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
- // Verify that the allocated containers are OPPORTUNISTIC
- for (Container allocatedContainer : allocResponse
- .getAllocatedContainers()) {
- ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
- .newContainerTokenIdentifier(
- allocatedContainer.getContainerToken());
- Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
- containerTokenIdentifier.getExecutionType());
- }
-
- LOG.info("testDistributedSchedulingE2E - Finish");
-
- FinishApplicationMasterResponse responseFinish =
- client.finishApplicationMaster(FinishApplicationMasterRequest
- .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
-
- Assert.assertNotNull(responseFinish);
-
- } finally {
- if (rmClient != null) {
- rmClient.stop();
+ LOG.info("testDistributedSchedulingE2E - Register");
+
+ RegisterApplicationMasterResponse responseRegister =
+ client.registerApplicationMaster(RegisterApplicationMasterRequest
+ .newInstance(NetUtils.getHostname(), 1024, ""));
+
+ Assert.assertNotNull(responseRegister);
+ Assert.assertNotNull(responseRegister.getQueue());
+ Assert.assertNotNull(responseRegister.getApplicationACLs());
+ Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
+ Assert
+ .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
+ Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
+ Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
+
+ RMApp rmApp =
+ cluster.getResourceManager().getRMContext().getRMApps().get(appId);
+ Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+
+ LOG.info("testDistributedSchedulingE2E - Allocate");
+
+ AllocateRequest request =
+ createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
+
+ // Replace 'ANY' requests with OPPORTUNISTIC aks and remove
+ // everything else
+ List newAskList = new ArrayList<>();
+ for (ResourceRequest rr : request.getAskList()) {
+ if (ResourceRequest.ANY.equals(rr.getResourceName())) {
+ ResourceRequest newRR = ResourceRequest.newInstance(rr
+ .getPriority(), rr.getResourceName(),
+ rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
+ rr.getNodeLabelExpression(),
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true));
+ newAskList.add(newRR);
}
- cluster.stop();
}
+ request.setAskList(newAskList);
+
+ AllocateResponse allocResponse = client.allocate(request);
+ Assert.assertNotNull(allocResponse);
+
+ // Ensure that all the requests are satisfied immediately
+ Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+ // Verify that the allocated containers are OPPORTUNISTIC
+ for (Container allocatedContainer : allocResponse
+ .getAllocatedContainers()) {
+ ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+ .newContainerTokenIdentifier(
+ allocatedContainer.getContainerToken());
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ containerTokenIdentifier.getExecutionType());
+ }
+
+ LOG.info("testDistributedSchedulingE2E - Finish");
}
/**
@@ -175,133 +218,305 @@ public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
*/
@Test(timeout = 60000)
public void testMixedExecutionTypeRequestE2E() throws Exception {
- MiniYARNCluster cluster =
- new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
- YarnClient rmClient = null;
- ApplicationMasterProtocol client;
-
- try {
- Configuration conf = new YarnConfiguration();
- conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
- conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
- conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
- cluster.init(conf);
- cluster.start();
- final Configuration yarnConf = cluster.getConfig();
-
- // the client has to connect to AMRMProxy
-
- yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
- rmClient = YarnClient.createYarnClient();
- rmClient.init(yarnConf);
- rmClient.start();
-
- // Submit application
-
- ApplicationId appId = createApp(rmClient, cluster);
-
- client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
-
- LOG.info("testDistributedSchedulingE2E - Register");
-
- RegisterApplicationMasterResponse responseRegister =
- client.registerApplicationMaster(RegisterApplicationMasterRequest
- .newInstance(NetUtils.getHostname(), 1024, ""));
-
- Assert.assertNotNull(responseRegister);
- Assert.assertNotNull(responseRegister.getQueue());
- Assert.assertNotNull(responseRegister.getApplicationACLs());
- Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
- Assert
- .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
- Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
- Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
- RMApp rmApp =
- cluster.getResourceManager().getRMContext().getRMApps().get(appId);
- Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
- LOG.info("testDistributedSchedulingE2E - Allocate");
-
- AllocateRequest request =
- createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
- List askList = request.getAskList();
- List newAskList = new ArrayList<>(askList);
-
- // Duplicate all ANY requests marking them as opportunistic
- for (ResourceRequest rr : askList) {
- if (ResourceRequest.ANY.equals(rr.getResourceName())) {
- ResourceRequest newRR = ResourceRequest.newInstance(rr
- .getPriority(), rr.getResourceName(),
- rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
- rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC);
- newAskList.add(newRR);
- }
- }
- request.setAskList(newAskList);
-
- AllocateResponse allocResponse = client.allocate(request);
- Assert.assertNotNull(allocResponse);
-
- // Ensure that all the requests are satisfied immediately
- Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
- // Verify that the allocated containers are OPPORTUNISTIC
- for (Container allocatedContainer : allocResponse
- .getAllocatedContainers()) {
- ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
- .newContainerTokenIdentifier(
- allocatedContainer.getContainerToken());
- Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
- containerTokenIdentifier.getExecutionType());
+ LOG.info("testDistributedSchedulingE2E - Register");
+
+ RegisterApplicationMasterResponse responseRegister =
+ client.registerApplicationMaster(RegisterApplicationMasterRequest
+ .newInstance(NetUtils.getHostname(), 1024, ""));
+
+ Assert.assertNotNull(responseRegister);
+ Assert.assertNotNull(responseRegister.getQueue());
+ Assert.assertNotNull(responseRegister.getApplicationACLs());
+ Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
+ Assert
+ .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
+ Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
+ Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
+
+ RMApp rmApp =
+ cluster.getResourceManager().getRMContext().getRMApps().get(appId);
+ Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+
+ LOG.info("testDistributedSchedulingE2E - Allocate");
+
+ AllocateRequest request =
+ createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
+ List askList = request.getAskList();
+ List newAskList = new ArrayList<>(askList);
+
+ // Duplicate all ANY requests marking them as opportunistic
+ for (ResourceRequest rr : askList) {
+ if (ResourceRequest.ANY.equals(rr.getResourceName())) {
+ ResourceRequest newRR = ResourceRequest.newInstance(rr
+ .getPriority(), rr.getResourceName(),
+ rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
+ rr.getNodeLabelExpression(),
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true));
+ newAskList.add(newRR);
}
+ }
+ request.setAskList(newAskList);
+
+ AllocateResponse allocResponse = client.allocate(request);
+ Assert.assertNotNull(allocResponse);
+
+ // Ensure that all the requests are satisfied immediately
+ Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+ // Verify that the allocated containers are OPPORTUNISTIC
+ for (Container allocatedContainer : allocResponse
+ .getAllocatedContainers()) {
+ ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+ .newContainerTokenIdentifier(
+ allocatedContainer.getContainerToken());
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ containerTokenIdentifier.getExecutionType());
+ }
- request.setAskList(new ArrayList());
- request.setResponseId(request.getResponseId() + 1);
+ request.setAskList(new ArrayList());
+ request.setResponseId(request.getResponseId() + 1);
- Thread.sleep(1000);
+ Thread.sleep(1000);
- // RM should allocate GUARANTEED containers within 2 calls to allocate()
- allocResponse = client.allocate(request);
- Assert.assertNotNull(allocResponse);
- Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+ // RM should allocate GUARANTEED containers within 2 calls to allocate()
+ allocResponse = client.allocate(request);
+ Assert.assertNotNull(allocResponse);
+ Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+ // Verify that the allocated containers are GUARANTEED
+ for (Container allocatedContainer : allocResponse
+ .getAllocatedContainers()) {
+ ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+ .newContainerTokenIdentifier(
+ allocatedContainer.getContainerToken());
+ Assert.assertEquals(ExecutionType.GUARANTEED,
+ containerTokenIdentifier.getExecutionType());
+ }
+
+ LOG.info("testDistributedSchedulingE2E - Finish");
+ }
+
+ /**
+ * Validates if AMRMClient can be used with Distributed Scheduling turned on
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 1200000)
+ @SuppressWarnings("unchecked")
+ public void testAMRMClient() throws Exception {
+ AMRMClientImpl amClient = null;
+ try {
+ Priority priority = Priority.newInstance(1);
+ Priority priority2 = Priority.newInstance(2);
+ Resource capability = Resource.newInstance(1024, 1);
+
+ List nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
+ String node = nodeReports.get(0).getNodeId().getHost();
+ String rack = nodeReports.get(0).getRackName();
+ String[] nodes = new String[]{node};
+ String[] racks = new String[]{rack};
+
+ // start am rm client
+ amClient = (AMRMClientImpl) AMRMClient.createAMRMClient(client);
+ amClient.init(yarnConf);
+ amClient.start();
+ amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
+
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
+ .get(node).get(ExecutionType.GUARANTEED).get(capability)
+ .remoteRequest.getNumContainers();
+ int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
+ .get(rack).get(ExecutionType.GUARANTEED).get(capability)
+ .remoteRequest.getNumContainers();
+ int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
+ .get(ResourceRequest.ANY).get(ExecutionType.GUARANTEED)
+ .get(capability).remoteRequest.getNumContainers();
+ int oppContainersRequestedAny =
+ amClient.remoteRequestsTable.get(priority2).get(ResourceRequest.ANY)
+ .get(ExecutionType.OPPORTUNISTIC).get(capability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(2, containersRequestedNode);
+ assertEquals(2, containersRequestedRack);
+ assertEquals(2, containersRequestedAny);
+ assertEquals(1, oppContainersRequestedAny);
+
+ assertEquals(4, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ int iterationsLeft = 10;
+ Set releases = new TreeSet<>();
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap receivedNMTokens = new HashMap<>();
+
+ while (allocatedContainerCount <
+ (containersRequestedAny + oppContainersRequestedAny)
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount += allocResponse.getAllocatedContainers()
+ .size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ ContainerId rejectContainerId = container.getId();
+ releases.add(rejectContainerId);
+ }
- // Verify that the allocated containers are GUARANTEED
- for (Container allocatedContainer : allocResponse
- .getAllocatedContainers()) {
- ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
- .newContainerTokenIdentifier(
- allocatedContainer.getContainerToken());
- Assert.assertEquals(ExecutionType.GUARANTEED,
- containerTokenIdentifier.getExecutionType());
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < containersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
}
- LOG.info("testDistributedSchedulingE2E - Finish");
+ assertEquals(allocatedContainerCount,
+ containersRequestedAny + oppContainersRequestedAny);
+ for (ContainerId rejectContainerId : releases) {
+ amClient.releaseAssignedContainer(rejectContainerId);
+ }
+ assertEquals(3, amClient.release.size());
+ assertEquals(0, amClient.ask.size());
+
+ // need to tell the AMRMClient that we dont need these resources anymore
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ assertEquals(4, amClient.ask.size());
+
+ // test RPC exception handling
+ amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
+ nodes, racks, priority));
+ amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
+ nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ final AMRMClient amc = amClient;
+ ApplicationMasterProtocol realRM = amClient.rmClient;
+ try {
+ ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol
+ .class);
+ when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
+ new Answer() {
+ public AllocateResponse answer(InvocationOnMock invocation)
+ throws Exception {
+ amc.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes,
+ racks, priority));
+ amc.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks,
+ priority));
+ amc.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null,
+ priority2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ throw new Exception();
+ }
+ });
+ amClient.rmClient = mockRM;
+ amClient.allocate(0.1f);
+ } catch (Exception ioe) {
+ } finally {
+ amClient.rmClient = realRM;
+ }
- FinishApplicationMasterResponse responseFinish =
- client.finishApplicationMaster(FinishApplicationMasterRequest
- .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
+ assertEquals(3, amClient.release.size());
+ assertEquals(6, amClient.ask.size());
+
+ iterationsLeft = 3;
+ // do a few iterations to ensure RM is not going send new containers
+ while (iterationsLeft-- > 0) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ assertEquals(0, allocResponse.getAllocatedContainers().size());
+ if (allocResponse.getCompletedContainersStatuses().size() > 0) {
+ for (ContainerStatus cStatus : allocResponse
+ .getCompletedContainersStatuses()) {
+ if (releases.contains(cStatus.getContainerId())) {
+ assertEquals(cStatus.getState(), ContainerState.COMPLETE);
+ assertEquals(-100, cStatus.getExitStatus());
+ releases.remove(cStatus.getContainerId());
+ }
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
+ }
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
- Assert.assertNotNull(responseFinish);
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
} finally {
- if (rmClient != null) {
- rmClient.stop();
+ if (amClient != null && amClient.getServiceState() == Service.STATE
+ .STARTED) {
+ amClient.stop();
}
- cluster.stop();
}
}
- @Ignore
- @Override
- public void testAMRMProxyE2E() throws Exception { }
-
- @Ignore
- @Override
- public void testE2ETokenRenewal() throws Exception { }
-
- @Ignore
- @Override
- public void testE2ETokenSwap() throws Exception { }
+ private void sleep(int sleepTime) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index cd04130..ec78eda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -252,8 +253,8 @@ public void testNMClient()
}
int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
- .get(ResourceRequest.ANY).get(capability).remoteRequest
- .getNumContainers();
+ .get(ResourceRequest.ANY).get(ExecutionType.GUARANTEED).get(capability)
+ .remoteRequest.getNumContainers();
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java
new file mode 100644
index 0000000..0037dd3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProtoOrBuilder;
+
+/**
+ * Implementation of ExecutionTypeRequest.
+ */
+public class ExecutionTypeRequestPBImpl extends ExecutionTypeRequest {
+ private ExecutionTypeRequestProto proto =
+ ExecutionTypeRequestProto.getDefaultInstance();
+ private ExecutionTypeRequestProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ public ExecutionTypeRequestPBImpl() {
+ builder = ExecutionTypeRequestProto.newBuilder();
+ }
+
+ public ExecutionTypeRequestPBImpl(ExecutionTypeRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ExecutionTypeRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ public ExecutionTypeRequestProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public ExecutionType getExecutionType() {
+ ExecutionTypeRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasExecutionType()) {
+ return null;
+ }
+ return ProtoUtils.convertFromProtoFormat(p.getExecutionType());
+ }
+
+ @Override
+ public void setExecutionType(ExecutionType execType) {
+ maybeInitBuilder();
+ if (execType == null) {
+ builder.clearExecutionType();
+ return;
+ }
+ builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType));
+ }
+
+ @Override
+ public void setEnforceExecutionType(boolean enforceExecutionType) {
+ maybeInitBuilder();
+ builder.setEnforceExecutionType(enforceExecutionType);
+ }
+
+ @Override
+ public boolean getEnforceExecutionType() {
+ ExecutionTypeRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getEnforceExecutionType();
+ }
+
+ @Override
+ public String toString() {
+ return "{Execution Type: " + getExecutionType()
+ + ", Enforce Execution Type: " + getEnforceExecutionType() + "}";
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 236df90..1a0f30a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -60,6 +61,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -324,4 +326,17 @@ public static ContainerRetryPolicy convertFromProtoFormat(
ContainerRetryPolicyProto e) {
return ContainerRetryPolicy.valueOf(e.name());
}
+
+ /*
+ * ExecutionTypeRequest
+ */
+ public static ExecutionTypeRequestProto convertToProtoFormat(
+ ExecutionTypeRequest e) {
+ return ((ExecutionTypeRequestPBImpl)e).getProto();
+ }
+
+ public static ExecutionTypeRequest convertFromProtoFormat(
+ ExecutionTypeRequestProto e) {
+ return new ExecutionTypeRequestPBImpl(e);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
index 53ae2cd..b0c4b97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
@@ -21,7 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -39,6 +39,7 @@
private Priority priority = null;
private Resource capability = null;
+ private ExecutionTypeRequest executionTypeRequest = null;
public ResourceRequestPBImpl() {
@@ -64,6 +65,10 @@ private void mergeLocalToBuilder() {
if (this.capability != null) {
builder.setCapability(convertToProtoFormat(this.capability));
}
+ if (this.executionTypeRequest != null) {
+ builder.setExecutionTypeRequest(
+ ProtoUtils.convertToProtoFormat(this.executionTypeRequest));
+ }
}
private void mergeLocalToProto() {
@@ -102,6 +107,29 @@ public void setPriority(Priority priority) {
builder.clearPriority();
this.priority = priority;
}
+
+
+ public ExecutionTypeRequest getExecutionTypeRequest() {
+ ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.executionTypeRequest != null) {
+ return this.executionTypeRequest;
+ }
+ if (!p.hasExecutionTypeRequest()) {
+ return null;
+ }
+ this.executionTypeRequest =
+ ProtoUtils.convertFromProtoFormat(p.getExecutionTypeRequest());
+ return this.executionTypeRequest;
+ }
+
+ public void setExecutionTypeRequest(ExecutionTypeRequest execSpec) {
+ maybeInitBuilder();
+ if (execSpec == null) {
+ builder.clearExecutionTypeRequest();
+ }
+ this.executionTypeRequest = execSpec;
+ }
+
@Override
public String getResourceName() {
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
@@ -186,6 +214,7 @@ public String toString() {
+ ", # Containers: " + getNumContainers()
+ ", Location: " + getResourceName()
+ ", Relax Locality: " + getRelaxLocality()
+ + ", Execution Type Request: " + getExecutionTypeRequest()
+ ", Node Label Expression: " + getNodeLabelExpression() + "}";
}
@@ -207,24 +236,4 @@ public void setNodeLabelExpression(String nodeLabelExpression) {
}
builder.setNodeLabelExpression(nodeLabelExpression);
}
-
- @Override
- public ExecutionType getExecutionType() {
- ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasExecutionType()) {
- return null;
- }
- return ProtoUtils.convertFromProtoFormat(p.getExecutionType());
- }
-
- @Override
- public void setExecutionType(ExecutionType execType) {
- maybeInitBuilder();
- if (execType == null) {
- builder.clearExecutionType();
- return;
- }
- builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType));
- }
-
-}
\ No newline at end of file
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index 14f61b7..366b547 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -123,6 +123,8 @@
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -463,6 +465,7 @@ public static void setup() throws Exception {
"http", "localhost", 8080, "file0"));
typeValueCache.put(SerializedException.class,
SerializedException.newInstance(new IOException("exception for test")));
+ generateByNewInstance(ExecutionTypeRequest.class);
generateByNewInstance(LogAggregationContext.class);
generateByNewInstance(ApplicationId.class);
generateByNewInstance(ApplicationAttemptId.class);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
index fca814b..8e2ceb0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
@@ -214,7 +214,8 @@ private PartitionedResourceRequests partitionAskList(List
PartitionedResourceRequests partitionedRequests =
new PartitionedResourceRequests();
for (ResourceRequest rr : askList) {
- if (rr.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ if (rr.getExecutionTypeRequest().getExecutionType() ==
+ ExecutionType.OPPORTUNISTIC) {
partitionedRequests.getOpportunistic().add(rr);
} else {
partitionedRequests.getGuaranteed().add(rr);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
index e987e79..a1d39f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
@@ -19,8 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
@@ -138,13 +138,15 @@ public DistSchedAllocateResponse answer(InvocationOnMock
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
ResourceRequest guaranteedReq = Records.newRecord(ResourceRequest.class);
- guaranteedReq.setExecutionType(ExecutionType.GUARANTEED);
+ guaranteedReq.setExecutionTypeRequest(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true));
guaranteedReq.setNumContainers(5);
guaranteedReq.setCapability(Resource.newInstance(2048, 2));
guaranteedReq.setRelaxLocality(true);
guaranteedReq.setResourceName("*");
ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class);
- opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
+ opportunisticReq.setExecutionTypeRequest(
+ ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true));
opportunisticReq.setNumContainers(4);
opportunisticReq.setCapability(Resource.newInstance(1024, 4));
opportunisticReq.setPriority(Priority.newInstance(100));
@@ -167,7 +169,8 @@ public DistSchedAllocateResponse answer(InvocationOnMock
// New Allocate request
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq = Records.newRecord(ResourceRequest.class);
- opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
+ opportunisticReq.setExecutionTypeRequest(
+ ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true));
opportunisticReq.setNumContainers(6);
opportunisticReq.setCapability(Resource.newInstance(512, 3));
opportunisticReq.setPriority(Priority.newInstance(100));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
index 5d5ab78..7d2ed33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
@@ -39,8 +39,11 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -104,6 +107,13 @@ public Configuration getYarnConfiguration() {
ContainerId.newContainerId(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(12345, 1), 2), 3));
+ AllocateRequest allReq =
+ (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class);
+ allReq.setAskList(Arrays.asList(
+ ResourceRequest.newInstance(Priority.UNDEFINED, "a",
+ Resource.newInstance(1, 2), 1, true, "exp",
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true))));
DistributedSchedulingService service = createService(factory, rmContext, c);
Server server = service.getServer(rpc, conf, addr, null);
server.start();
@@ -168,8 +178,7 @@ public Configuration getYarnConfiguration() {
DistSchedAllocateResponse dsAllocResp =
new DistSchedAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
- ((AllocateRequestPBImpl)factory
- .newRecordInstance(AllocateRequest.class)).getProto()));
+ ((AllocateRequestPBImpl)allReq).getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
@@ -235,6 +244,10 @@ public AllocateResponse allocate(AllocateRequest request) throws
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
AllocateRequest request) throws YarnException, IOException {
+ List askList = request.getAskList();
+ Assert.assertEquals(1, askList.size());
+ Assert.assertTrue(askList.get(0)
+ .getExecutionTypeRequest().getEnforceExecutionType());
DistSchedAllocateResponse resp =
factory.newRecordInstance(DistSchedAllocateResponse.class);
resp.setNodesForScheduling(