diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index a556aa2..c816917 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -53,6 +53,8 @@
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
@@ -207,6 +209,17 @@ public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
containerTokenIdentifier.getExecutionType());
}
+ // Check that the RM sees OPPORTUNISTIC containers
+ ResourceScheduler scheduler = cluster.getResourceManager()
+ .getResourceScheduler();
+ for (Container allocatedContainer : allocResponse
+ .getAllocatedContainers()) {
+ ContainerId containerId = allocatedContainer.getId();
+ RMContainer rmContainer = scheduler.getRMContainer(containerId);
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ rmContainer.getExecutionType());
+ }
+
LOG.info("testDistributedSchedulingE2E - Finish");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 1a0f30a..4b62358 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
@@ -339,4 +340,17 @@ public static ExecutionTypeRequest convertFromProtoFormat(
ExecutionTypeRequestProto e) {
return new ExecutionTypeRequestPBImpl(e);
}
+
+ /*
+ * Container
+ */
+ public static YarnProtos.ContainerProto convertToProtoFormat(
+ Container t) {
+ return ((ContainerPBImpl)t).getProto();
+ }
+
+ public static ContainerStatusPBImpl convertFromProtoFormat(
+ YarnProtos.ContainerStatusProto p) {
+ return new ContainerStatusPBImpl(p);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
index 490c25b..26faa8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
@@ -22,7 +22,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -74,5 +74,5 @@ DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling(
@Unstable
@Idempotent
DistSchedAllocateResponse allocateForDistributedScheduling(
- AllocateRequest request) throws YarnException, IOException;
+ DistSchedAllocateRequest request) throws YarnException, IOException;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
index c1dd9e5..0ca61df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
@@ -22,9 +22,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -35,6 +37,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
@@ -77,9 +80,9 @@ public void close() {
@Override
public DistSchedRegisterResponse
- registerApplicationMasterForDistributedScheduling
- (RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
((RegisterApplicationMasterRequestPBImpl) request).getProto();
try {
@@ -93,10 +96,10 @@ public void close() {
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
- YarnServiceProtos.AllocateRequestProto requestProto =
- ((AllocateRequestPBImpl) request).getProto();
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
+ YarnServerCommonServiceProtos.DistSchedAllocateRequestProto requestProto =
+ ((DistSchedAllocateRequestPBImpl) request).getProto();
try {
return new DistSchedAllocateResponsePBImpl(
proxy.allocateForDistributedScheduling(null, requestProto));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
index 8be2893..2763259 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
@@ -77,8 +78,10 @@ public DistributedSchedulerProtocolPBServiceImpl(
@Override
public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
allocateForDistributedScheduling(RpcController controller,
- AllocateRequestProto proto) throws ServiceException {
- AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
+ YarnServerCommonServiceProtos.DistSchedAllocateRequestProto proto)
+ throws ServiceException {
+ DistSchedAllocateRequestPBImpl request =
+ new DistSchedAllocateRequestPBImpl(proto);
try {
DistSchedAllocateResponse response = real
.allocateForDistributedScheduling(request);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
new file mode 100644
index 0000000..afac25e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.util.List;
+
+@Public
+@Evolving
+public abstract class DistSchedAllocateRequest {
+
+ /**
+ * Get the underlying AllocateRequest object.
+ * @return Allocate request
+ */
+ @Public
+ @Evolving
+ public abstract AllocateRequest getAllocateRequest();
+
+ /**
+ * Set the underlying AllocateRequest object.
+ * @param allocateRequest Allocate request
+ */
+ @Public
+ @Evolving
+ public abstract void setAllocateRequest(AllocateRequest allocateRequest);
+
+ /**
+ * Get the list of newly allocated Container by the
+ * Distributed Scheduling component on the NodeManager.
+ * @return list of newly allocated Container
+ */
+ @Public
+ @Evolving
+ public abstract List getAllocatedContainers();
+
+ /**
+ * Set the list of newly allocated Container by the
+ * Distributed Scheduling component on the NodeManager.
+ * @param containers list of newly allocated Container
+ */
+ @Public
+ @Evolving
+ public abstract void setAllocatedContainers(List containers);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
new file mode 100644
index 0000000..b809b76
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class DistSchedAllocateRequestPBImpl extends DistSchedAllocateRequest {
+ private DistSchedAllocateRequestProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ private DistSchedAllocateRequestProto proto;
+ private AllocateRequest allocateRequest;
+ private List containers;
+
+ public DistSchedAllocateRequestPBImpl() {
+ builder = DistSchedAllocateRequestProto.newBuilder();
+ }
+
+ public DistSchedAllocateRequestPBImpl(DistSchedAllocateRequestProto proto) {
+ this.proto = proto;
+ this.viaProto = true;
+ }
+
+ @Override
+ public AllocateRequest getAllocateRequest() {
+ DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.allocateRequest != null) {
+ return this.allocateRequest;
+ }
+ if (!p.hasAllocateRequest()) {
+ return null;
+ }
+ this.allocateRequest = convertFromProtoFormat(p.getAllocateRequest());
+ return this.allocateRequest;
+ }
+
+ @Override
+ public void setAllocateRequest(AllocateRequest allocateRequest) {
+ maybeInitBuilder();
+ if (allocateRequest == null) {
+ builder.clearAllocateRequest();
+ }
+ this.allocateRequest = allocateRequest;
+ }
+
+ @Override
+ public List getAllocatedContainers() {
+ if (this.containers != null) {
+ return this.containers;
+ }
+ initAllocatedContainers();
+ return containers;
+ }
+
+ private void initAllocatedContainers() {
+ DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List list = p.getAllocatedContainersList();
+ this.containers = new ArrayList();
+ for (ContainerProto c : list) {
+ this.containers.add(convertFromProtoFormat(c));
+ }
+ }
+
+ @Override
+ public void setAllocatedContainers(List containers) {
+ maybeInitBuilder();
+ if (containers == null || containers.isEmpty()) {
+ if (this.containers != null) {
+ this.containers.clear();
+ }
+ builder.clearAllocatedContainers();
+ return;
+ }
+ this.containers = new ArrayList<>();
+ this.containers.addAll(containers);
+ }
+
+ public DistSchedAllocateRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = DistSchedAllocateRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.containers != null) {
+ builder.clearAllocatedContainers();
+ Iterable iterable =
+ getContainerProtoIterable(this.containers);
+ builder.addAllAllocatedContainers(iterable);
+ }
+ if (this.allocateRequest != null) {
+ builder.setAllocateRequest(
+ ((AllocateRequestPBImpl)this.allocateRequest).getProto());
+ }
+ }
+
+ private Iterable getContainerProtoIterable(
+ final List newContainersList) {
+ maybeInitBuilder();
+ return new Iterable() {
+ @Override
+ public synchronized Iterator iterator() {
+ return new Iterator() {
+ Iterator iter = newContainersList.iterator();
+
+ @Override
+ public synchronized boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public synchronized ContainerProto next() {
+ return ProtoUtils.convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public synchronized void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+ }
+ };
+ }
+
+ private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
+ return new ContainerPBImpl(p);
+ }
+
+ private AllocateRequestPBImpl convertFromProtoFormat(AllocateRequestProto p) {
+ return new AllocateRequestPBImpl(p);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
index b94656c..818eb4a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
@@ -35,5 +35,5 @@ import "yarn_server_common_service_protos.proto";
service DistributedSchedulerProtocolService {
rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
- rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto);
+ rpc allocateForDistributedScheduling (DistSchedAllocateRequestProto) returns (DistSchedAllocateResponseProto);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index a7e5a86..3e3cb82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -41,6 +41,11 @@ message DistSchedAllocateResponseProto {
repeated NodeIdProto nodes_for_scheduling = 2;
}
+message DistSchedAllocateRequestProto {
+ optional AllocateRequestProto allocate_request = 1;
+ repeated ContainerProto allocated_containers = 2;
+}
+
message NodeLabelsProto {
repeated NodeLabelProto nodeLabels = 1;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
index 2b2a2f6..55c65f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
@@ -21,10 +21,10 @@
import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
@@ -118,8 +118,8 @@ public AMRMProxyApplicationContext getApplicationContext() {
* @throws IOException
*/
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
return (this.nextInterceptor != null) ?
this.nextInterceptor.allocateForDistributedScheduling(request) : null;
}
@@ -135,9 +135,9 @@ public AMRMProxyApplicationContext getApplicationContext() {
*/
@Override
public DistSchedRegisterResponse
- registerApplicationMasterForDistributedScheduling
- (RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
return (this.nextInterceptor != null) ? this.nextInterceptor
.registerApplicationMasterForDistributedScheduling(request) : null;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 1637682..debff76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords
.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,8 +134,8 @@ public AllocateResponse allocate(final AllocateRequest request)
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocateForDistributedScheduling request" +
"to the real YARN RM");
@@ -212,9 +213,9 @@ public AllocateResponse allocate(AllocateRequest request) throws
}
@Override
- public DistSchedAllocateResponse
- allocateForDistributedScheduling(AllocateRequest request) throws
- YarnException, IOException {
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request)
+ throws YarnException, IOException {
throw new IOException("Not Supported !!");
}
};
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
index 8e2ceb0..a88b9f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
@@ -21,6 +21,9 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
@@ -99,6 +102,9 @@
private static final Logger LOG = LoggerFactory
.getLogger(LocalScheduler.class);
+ private final static RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
// Currently just used to keep track of allocated Containers
// Can be used for reporting stats later
private Set containersAllocated = new HashSet<>();
@@ -176,7 +182,10 @@ void initLocal(ApplicationAttemptId applicationAttemptId,
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
- return allocateForDistributedScheduling(request).getAllocateResponse();
+ DistSchedAllocateRequest distRequest =
+ recordFactory.newRecordInstance(DistSchedAllocateRequest.class);
+ distRequest.setAllocateRequest(request);
+ return allocateForDistributedScheduling(distRequest).getAllocateResponse();
}
@Override
@@ -324,9 +333,9 @@ private void addToNodeList(List nodes) {
@Override
public DistSchedRegisterResponse
- registerApplicationMasterForDistributedScheduling
- (RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
LOG.info("Forwarding registration request to the" +
"Distributed Scheduler Service on YARN RM");
DistSchedRegisterResponse dsResp = getNextInterceptor()
@@ -336,17 +345,18 @@ private void addToNodeList(List nodes) {
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocate request to the" +
"Distributed Scheduler Service on YARN RM");
}
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
- PartitionedResourceRequests partitionedAsks = partitionAskList(request
- .getAskList());
+ PartitionedResourceRequests partitionedAsks = partitionAskList(
+ request.getAllocateRequest().getAskList());
- List releasedContainers = request.getReleaseList();
+ List releasedContainers =
+ request.getAllocateRequest().getReleaseList();
int numReleasedContainers = releasedContainers.size();
if (numReleasedContainers > 0) {
LOG.info("AttemptID: " + applicationAttemptId + " released: "
@@ -355,7 +365,8 @@ private void addToNodeList(List nodes) {
}
// Also, update black list
- ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
+ ResourceBlacklistRequest rbr =
+ request.getAllocateRequest().getResourceBlacklistRequest();
if (rbr != null) {
blacklist.removeAll(rbr.getBlacklistRemovals());
blacklist.addAll(rbr.getBlacklistAdditions());
@@ -381,9 +392,10 @@ private void addToNodeList(List nodes) {
allocatedContainers.addAll(e.getValue());
}
}
+ request.setAllocatedContainers(allocatedContainers);
// Send all the GUARANTEED Reqs to RM
- request.setAskList(partitionedAsks.getGuaranteed());
+ request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
DistSchedAllocateResponse dsResp =
getNextInterceptor().allocateForDistributedScheduling(request);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
index a1d39f7..31f8085 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
@@ -126,7 +127,7 @@ public void setBytes(ByteBuffer bytes) {}
Mockito.when(
finalReqIntcptr.allocateForDistributedScheduling(
- Mockito.any(AllocateRequest.class)))
+ Mockito.any(DistSchedAllocateRequest.class)))
.thenAnswer(new Answer() {
@Override
public DistSchedAllocateResponse answer(InvocationOnMock
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
index a93f683..5aabddc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
@@ -32,6 +33,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -45,6 +47,12 @@
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
@@ -60,6 +68,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -229,9 +238,23 @@ public AllocateResponse allocate(AllocateRequest request) throws
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
- AllocateResponse response = allocate(request);
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
+ List distAllocContainers = request.getAllocatedContainers();
+ for (Container container : distAllocContainers) {
+ // Create RMContainer
+ SchedulerApplicationAttempt appAttempt =
+ ((AbstractYarnScheduler) rmContext.getScheduler())
+ .getCurrentAttemptForContainer(container.getId());
+ RMContainer rmContainer = new RMContainerImpl(container,
+ appAttempt.getApplicationAttemptId(), container.getNodeId(),
+ appAttempt.getUser(), rmContext, true);
+ appAttempt.addRMContainer(container.getId(), rmContainer);
+ rmContainer.handle(
+ new RMContainerEvent(container.getId(),
+ RMContainerEventType.LAUNCHED));
+ }
+ AllocateResponse response = allocate(request.getAllocateRequest());
DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
(DistSchedAllocateResponse.class);
dsResp.setAllocateResponse(response);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index f37923f..a1826bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -91,4 +92,8 @@
void cancelIncreaseReservation();
String getQueueName();
+
+ ExecutionType getExecutionType();
+
+ boolean isExternallyAllocated();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 95f81d4..21bf124 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -79,6 +80,8 @@
RMContainerEventType.KILL)
.addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
+ .addTransition(RMContainerState.NEW, RMContainerState.RUNNING,
+ RMContainerEventType.LAUNCHED)
.addTransition(RMContainerState.NEW,
EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
@@ -183,6 +186,8 @@
private Resource lastConfirmedResource;
private volatile String queueName;
+ private boolean isExternallyAllocated;
+
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext) {
@@ -190,6 +195,13 @@ public RMContainerImpl(Container container,
.currentTimeMillis(), "");
}
+ public RMContainerImpl(Container container,
+ ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+ RMContext rmContext, boolean isExternallyAllocated) {
+ this(container, appAttemptId, nodeId, user, rmContext, System
+ .currentTimeMillis(), "", isExternallyAllocated);
+ }
+
private boolean saveNonAMContainerMetaInfo;
public RMContainerImpl(Container container,
@@ -202,6 +214,14 @@ public RMContainerImpl(Container container,
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, long creationTime, String nodeLabelExpression) {
+ this(container, appAttemptId, nodeId, user, rmContext, creationTime,
+ nodeLabelExpression, false);
+ }
+
+ public RMContainerImpl(Container container,
+ ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+ RMContext rmContext, long creationTime, String nodeLabelExpression,
+ boolean isExternallyAllocated) {
this.stateMachine = stateMachineFactory.make(this);
this.containerId = container.getId();
this.nodeId = nodeId;
@@ -216,6 +236,7 @@ public RMContainerImpl(Container container,
this.resourceRequests = null;
this.nodeLabelExpression = nodeLabelExpression;
this.lastConfirmedResource = container.getResource();
+ this.isExternallyAllocated = isExternallyAllocated;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
@@ -827,4 +848,14 @@ public void setQueueName(String queueName) {
public String getQueueName() {
return queueName;
}
+
+ @Override
+ public ExecutionType getExecutionType() {
+ return container.getExecutionType();
+ }
+
+ @Override
+ public boolean isExternallyAllocated() {
+ return isExternallyAllocated;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 3066339..31ba72e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -516,7 +516,23 @@ public void completedContainer(RMContainer rmContainer,
return;
}
- completedContainerInternal(rmContainer, containerStatus, event);
+ if (!rmContainer.isExternallyAllocated()) {
+ completedContainerInternal(rmContainer, containerStatus, event);
+ } else {
+ ContainerId containerId = rmContainer.getContainerId();
+ // Inform the container
+ rmContainer.handle(
+ new RMContainerFinishedEvent(containerId, containerStatus, event));
+ SchedulerApplicationAttempt schedulerAttempt =
+ getCurrentAttemptForContainer(containerId);
+ if (schedulerAttempt != null) {
+ schedulerAttempt.removeRMContainer(containerId);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Completed container: " + rmContainer.getContainerId() +
+ " in state: " + rmContainer.getState() + " event:" + event);
+ }
+ }
// If the container is getting killed in ACQUIRED state, the requester (AM
// for regular containers and RM itself for AM container) will not know what
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index b48b272..f0a8a69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -112,6 +112,8 @@
private boolean isAttemptRecovering;
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
+ protected ResourceUsage attemptResourceUsageExternal =
+ new ResourceUsage();
private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
@@ -288,6 +290,23 @@ public synchronized RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id);
}
+ public synchronized void addRMContainer(
+ ContainerId id, RMContainer rmContainer) {
+ liveContainers.put(id, rmContainer);
+ if (rmContainer.isExternallyAllocated()) {
+ this.attemptResourceUsageExternal.incUsed(
+ rmContainer.getAllocatedResource());
+ }
+ }
+
+ public synchronized void removeRMContainer(ContainerId containerId) {
+ RMContainer rmContainer = liveContainers.remove(containerId);
+ if (rmContainer != null && rmContainer.isExternallyAllocated()) {
+ this.attemptResourceUsageExternal.decUsed(
+ rmContainer.getAllocatedResource());
+ }
+ }
+
protected synchronized void resetReReservations(Priority priority) {
reReservations.setCount(priority, 0);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
index 7d2ed33..4716bab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
@@ -44,15 +44,17 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -175,10 +177,15 @@ public Configuration getYarnConfiguration() {
Assert.assertEquals(2,
dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());
+ DistSchedAllocateRequestPBImpl distAllReq =
+ (DistSchedAllocateRequestPBImpl)factory.newRecordInstance(
+ DistSchedAllocateRequest.class);
+ distAllReq.setAllocateRequest(allReq);
+ distAllReq.setAllocatedContainers(Arrays.asList(c));
DistSchedAllocateResponse dsAllocResp =
new DistSchedAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
- ((AllocateRequestPBImpl)allReq).getProto()));
+ distAllReq.getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
@@ -243,8 +250,13 @@ public AllocateResponse allocate(AllocateRequest request) throws
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
- AllocateRequest request) throws YarnException, IOException {
- List askList = request.getAskList();
+ DistSchedAllocateRequest request) throws YarnException, IOException {
+ List askList =
+ request.getAllocateRequest().getAskList();
+ List allocatedContainers = request.getAllocatedContainers();
+ Assert.assertEquals(1, allocatedContainers.size());
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ allocatedContainers.get(0).getExecutionType());
Assert.assertEquals(1, askList.size());
Assert.assertTrue(askList.get(0)
.getExecutionTypeRequest().getEnforceExecutionType());