diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
index 490c25b..437dcf7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
@@ -22,7 +22,8 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+ .DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -74,5 +75,5 @@ DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling(
@Unstable
@Idempotent
DistSchedAllocateResponse allocateForDistributedScheduling(
- AllocateRequest request) throws YarnException, IOException;
+ DistSchedAllocateRequest request) throws YarnException, IOException;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
index c1dd9e5..4e26ef4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
@@ -22,9 +22,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+ .DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -35,6 +38,9 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
+ .DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
@@ -77,9 +83,9 @@ public void close() {
@Override
public DistSchedRegisterResponse
- registerApplicationMasterForDistributedScheduling
- (RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
((RegisterApplicationMasterRequestPBImpl) request).getProto();
try {
@@ -93,10 +99,10 @@ public void close() {
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
- YarnServiceProtos.AllocateRequestProto requestProto =
- ((AllocateRequestPBImpl) request).getProto();
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
+ YarnServerCommonServiceProtos.DistSchedAllocateRequestProto requestProto =
+ ((DistSchedAllocateRequestPBImpl) request).getProto();
try {
return new DistSchedAllocateResponsePBImpl(
proxy.allocateForDistributedScheduling(null, requestProto));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
index 8be2893..d83c24b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
@@ -26,17 +26,17 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords
- .FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
+ .DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -77,8 +77,10 @@ public DistributedSchedulerProtocolPBServiceImpl(
@Override
public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
allocateForDistributedScheduling(RpcController controller,
- AllocateRequestProto proto) throws ServiceException {
- AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
+ YarnServerCommonServiceProtos.DistSchedAllocateRequestProto proto)
+ throws ServiceException {
+ DistSchedAllocateRequestPBImpl request =
+ new DistSchedAllocateRequestPBImpl(proto);
try {
DistSchedAllocateResponse response = real
.allocateForDistributedScheduling(request);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
new file mode 100644
index 0000000..478d45d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.util.List;
+
+@Public
+@Evolving
+public abstract class DistSchedAllocateRequest {
+
+ /**
+ * Get the list of newly allocated Container by the
+ * Distributed Scheduling component on the NodeManager.
+ * @return list of newly allocated Container
+ */
+ @Public
+ @Evolving
+ public abstract List getAllocatedContainers();
+
+ /**
+ * Set the list of newly allocated Container by the
+ * Distributed Scheduling component on the NodeManager.
+ * @param containers list of newly allocated Container
+ */
+ @Public
+ @Evolving
+ public abstract void setAllocatedContainers(List containers);
+
+ /**
+ * Get the underlying AllocateRequest object.
+ * @return Allocate request
+ */
+ @Public
+ @Evolving
+ public abstract AllocateRequest getAllocateRequest();
+
+ /**
+ * Set the underlying AllocateRequest object.
+ * @param allocateRequest Allocate request
+ */
+ @Public
+ @Evolving
+ public abstract void setAllocateRequest(AllocateRequest allocateRequest);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
new file mode 100644
index 0000000..052cf2e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
+
+import java.util.List;
+
+public class DistSchedAllocateRequestPBImpl extends DistSchedAllocateRequest {
+ private DistSchedAllocateRequestProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ private DistSchedAllocateRequestProto proto;
+ private List containers;
+ private AllocateRequest allocateRequest;
+
+ public DistSchedAllocateRequestPBImpl(DistSchedAllocateRequestProto proto) {
+ this.proto = proto;
+ }
+
+ @Override
+ public List getAllocatedContainers() {
+ if (this.containers != null) {
+ return this.containers;
+ }
+ initAllocatedContainers();
+ return containers;
+ }
+
+ private void initAllocatedContainers() {
+ DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List list = p.getAllocatedContainersList();
+ this.containers = new ArrayList();
+ for (ContainerProto c : list) {
+ this.containers.add(convertFromProtoFormat(c));
+ }
+ }
+
+ @Override
+ public void setAllocatedContainers(List containers) {
+ maybeInitBuilder();
+ if (containers == null) {
+ builder.clearAllocatedContainers();
+ }
+ this.containers = containers;
+ }
+
+ @Override
+ public AllocateRequest getAllocateRequest() {
+ DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.allocateRequest != null) {
+ return this.allocateRequest;
+ }
+ if (!p.hasAllocateRequest()) {
+ return null;
+ }
+ this.allocateRequest = convertFromProtoFormat(p.getAllocateRequest());
+ return this.allocateRequest;
+ }
+
+ @Override
+ public void setAllocateRequest(AllocateRequest allocateRequest) {
+ maybeInitBuilder();
+ if (allocateRequest == null) {
+ builder.clearAllocateRequest();
+ }
+ this.allocateRequest = allocateRequest;
+ }
+
+ public DistSchedAllocateRequestProto getProto() {
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = DistSchedAllocateRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
+ return new ContainerPBImpl(p);
+ }
+
+ private AllocateRequestPBImpl convertFromProtoFormat(AllocateRequestProto p) {
+ return new AllocateRequestPBImpl(p);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
index b94656c..818eb4a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
@@ -35,5 +35,5 @@ import "yarn_server_common_service_protos.proto";
service DistributedSchedulerProtocolService {
rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);
- rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto);
+ rpc allocateForDistributedScheduling (DistSchedAllocateRequestProto) returns (DistSchedAllocateResponseProto);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index a7e5a86..3e3cb82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -41,6 +41,11 @@ message DistSchedAllocateResponseProto {
repeated NodeIdProto nodes_for_scheduling = 2;
}
+message DistSchedAllocateRequestProto {
+ optional AllocateRequestProto allocate_request = 1;
+ repeated ContainerProto allocated_containers = 2;
+}
+
message NodeLabelsProto {
repeated NodeLabelProto nodeLabels = 1;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
index 2b2a2f6..55c65f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
@@ -21,10 +21,10 @@
import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
@@ -118,8 +118,8 @@ public AMRMProxyApplicationContext getApplicationContext() {
* @throws IOException
*/
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
return (this.nextInterceptor != null) ?
this.nextInterceptor.allocateForDistributedScheduling(request) : null;
}
@@ -135,9 +135,9 @@ public AMRMProxyApplicationContext getApplicationContext() {
*/
@Override
public DistSchedRegisterResponse
- registerApplicationMasterForDistributedScheduling
- (RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
return (this.nextInterceptor != null) ? this.nextInterceptor
.registerApplicationMasterForDistributedScheduling(request) : null;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 1637682..debff76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords
.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,8 +134,8 @@ public AllocateResponse allocate(final AllocateRequest request)
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocateForDistributedScheduling request" +
"to the real YARN RM");
@@ -212,9 +213,9 @@ public AllocateResponse allocate(AllocateRequest request) throws
}
@Override
- public DistSchedAllocateResponse
- allocateForDistributedScheduling(AllocateRequest request) throws
- YarnException, IOException {
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request)
+ throws YarnException, IOException {
throw new IOException("Not Supported !!");
}
};
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
index 8e2ceb0..6376e71 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
@@ -17,10 +17,12 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
-
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
@@ -99,6 +101,9 @@
private static final Logger LOG = LoggerFactory
.getLogger(LocalScheduler.class);
+ private final static RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
// Currently just used to keep track of allocated Containers
// Can be used for reporting stats later
private Set containersAllocated = new HashSet<>();
@@ -176,7 +181,10 @@ void initLocal(ApplicationAttemptId applicationAttemptId,
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
- return allocateForDistributedScheduling(request).getAllocateResponse();
+ DistSchedAllocateRequest distRequest =
+ recordFactory.newRecordInstance(DistSchedAllocateRequest.class);
+ distRequest.setAllocateRequest(request);
+ return allocateForDistributedScheduling(distRequest).getAllocateResponse();
}
@Override
@@ -324,9 +332,9 @@ private void addToNodeList(List nodes) {
@Override
public DistSchedRegisterResponse
- registerApplicationMasterForDistributedScheduling
- (RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
LOG.info("Forwarding registration request to the" +
"Distributed Scheduler Service on YARN RM");
DistSchedRegisterResponse dsResp = getNextInterceptor()
@@ -336,17 +344,18 @@ private void addToNodeList(List nodes) {
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocate request to the" +
"Distributed Scheduler Service on YARN RM");
}
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
- PartitionedResourceRequests partitionedAsks = partitionAskList(request
- .getAskList());
+ PartitionedResourceRequests partitionedAsks = partitionAskList(
+ request.getAllocateRequest().getAskList());
- List releasedContainers = request.getReleaseList();
+ List releasedContainers =
+ request.getAllocateRequest().getReleaseList();
int numReleasedContainers = releasedContainers.size();
if (numReleasedContainers > 0) {
LOG.info("AttemptID: " + applicationAttemptId + " released: "
@@ -355,7 +364,8 @@ private void addToNodeList(List nodes) {
}
// Also, update black list
- ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
+ ResourceBlacklistRequest rbr =
+ request.getAllocateRequest().getResourceBlacklistRequest();
if (rbr != null) {
blacklist.removeAll(rbr.getBlacklistRemovals());
blacklist.addAll(rbr.getBlacklistAdditions());
@@ -383,7 +393,7 @@ private void addToNodeList(List nodes) {
}
// Send all the GUARANTEED Reqs to RM
- request.setAskList(partitionedAsks.getGuaranteed());
+ request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
DistSchedAllocateResponse dsResp =
getNextInterceptor().allocateForDistributedScheduling(request);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
index a1d39f7..31f8085 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
@@ -126,7 +127,7 @@ public void setBytes(ByteBuffer bytes) {}
Mockito.when(
finalReqIntcptr.allocateForDistributedScheduling(
- Mockito.any(AllocateRequest.class)))
+ Mockito.any(DistSchedAllocateRequest.class)))
.thenAnswer(new Answer() {
@Override
public DistSchedAllocateResponse answer(InvocationOnMock
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
index a93f683..e9ef3e8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
@@ -32,6 +33,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+ .DistSchedAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -45,6 +48,17 @@
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
+ .RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
@@ -60,6 +74,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -229,9 +244,23 @@ public AllocateResponse allocate(AllocateRequest request) throws
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
- AllocateResponse response = allocate(request);
+ public DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistSchedAllocateRequest request) throws YarnException, IOException {
+ List distAllocContainers = request.getAllocatedContainers();
+ for (Container container : distAllocContainers) {
+ // Create RMContainer
+ SchedulerApplicationAttempt appAttempt =
+ ((AbstractYarnScheduler) rmContext.getScheduler())
+ .getCurrentAttemptForContainer(container.getId());
+ RMContainer rmContainer = new RMContainerImpl(container,
+ appAttempt.getApplicationAttemptId(), container.getNodeId(),
+ appAttempt.getUser(), rmContext, true);
+ appAttempt.addRMContainer(container.getId(), rmContainer);
+ rmContainer.handle(
+ new RMContainerEvent(container.getId(),
+ RMContainerEventType.LAUNCHED));
+ }
+ AllocateResponse response = allocate(request.getAllocateRequest());
DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
(DistSchedAllocateResponse.class);
dsResp.setAllocateResponse(response);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index f37923f..a1826bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -91,4 +92,8 @@
void cancelIncreaseReservation();
String getQueueName();
+
+ ExecutionType getExecutionType();
+
+ boolean isExternallyAllocated();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 5121493..c0d160b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -79,6 +80,8 @@
RMContainerEventType.KILL)
.addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
+ .addTransition(RMContainerState.NEW, RMContainerState.RUNNING,
+ RMContainerEventType.LAUNCHED)
.addTransition(RMContainerState.NEW,
EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
@@ -183,6 +186,8 @@
private Resource lastConfirmedResource;
private volatile String queueName;
+ private boolean isExternallyAllocated;
+
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext) {
@@ -190,6 +195,13 @@ public RMContainerImpl(Container container,
.currentTimeMillis(), "");
}
+ public RMContainerImpl(Container container,
+ ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+ RMContext rmContext, boolean isExternallyAllocated) {
+ this(container, appAttemptId, nodeId, user, rmContext, System
+ .currentTimeMillis(), "", isExternallyAllocated);
+ }
+
private boolean saveNonAMContainerMetaInfo;
public RMContainerImpl(Container container,
@@ -202,6 +214,14 @@ public RMContainerImpl(Container container,
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, long creationTime, String nodeLabelExpression) {
+ this(container, appAttemptId, nodeId, user, rmContext, creationTime,
+ nodeLabelExpression, false);
+ }
+
+ public RMContainerImpl(Container container,
+ ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+ RMContext rmContext, long creationTime, String nodeLabelExpression,
+ boolean isExternallyAllocated) {
this.stateMachine = stateMachineFactory.make(this);
this.containerId = container.getId();
this.nodeId = nodeId;
@@ -216,6 +236,7 @@ public RMContainerImpl(Container container,
this.resourceRequests = null;
this.nodeLabelExpression = nodeLabelExpression;
this.lastConfirmedResource = container.getResource();
+ this.isExternallyAllocated = isExternallyAllocated;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
@@ -827,4 +848,14 @@ public void setQueueName(String queueName) {
public String getQueueName() {
return queueName;
}
+
+ @Override
+ public ExecutionType getExecutionType() {
+ return container.getExecutionType();
+ }
+
+ @Override
+ public boolean isExternallyAllocated() {
+ return isExternallyAllocated;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 8f03de2..7c5037c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -524,7 +524,16 @@ public void completedContainer(RMContainer rmContainer,
return;
}
- completedContainerInternal(rmContainer, containerStatus, event);
+ if (!rmContainer.isExternallyAllocated()) {
+ completedContainerInternal(rmContainer, containerStatus, event);
+ } else {
+ ContainerId containerId = rmContainer.getContainerId();
+ SchedulerApplicationAttempt schedulerAttempt =
+ getCurrentAttemptForContainer(containerId);
+ if (schedulerAttempt != null) {
+ schedulerAttempt.removeRMContainer(containerId);
+ }
+ }
// If the container is getting killed in ACQUIRED state, the requester (AM
// for regular containers and RM itself for AM container) will not know what
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index ffb8657..395397e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -112,6 +112,7 @@
private boolean isAttemptRecovering;
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
+ protected ResourceUsage attemptResourceUsageOpportunistic = new ResourceUsage();
private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
@@ -288,6 +289,23 @@ public synchronized RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id);
}
+ public synchronized void addRMContainer(
+ ContainerId id, RMContainer rmContainer) {
+ liveContainers.put(id, rmContainer);
+ if (rmContainer.isExternallyAllocated()) {
+ this.attemptResourceUsageOpportunistic.incUsed(
+ rmContainer.getAllocatedResource());
+ }
+ }
+
+ public synchronized void removeRMContainer(ContainerId containerId) {
+ RMContainer rmContainer = liveContainers.remove(containerId);
+ if (rmContainer != null && rmContainer.isExternallyAllocated()) {
+ this.attemptResourceUsageOpportunistic.decUsed(
+ rmContainer.getAllocatedResource());
+ }
+ }
+
protected synchronized void resetReReservations(Priority priority) {
reReservations.setCount(priority, 0);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
index 7d2ed33..b130cfa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
@@ -47,12 +47,15 @@
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -175,10 +178,14 @@ public Configuration getYarnConfiguration() {
Assert.assertEquals(2,
dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());
+ DistSchedAllocateRequestPBImpl distAllReq =
+ (DistSchedAllocateRequestPBImpl)factory.newRecordInstance(
+ DistSchedAllocateRequest.class);
+ distAllReq.setAllocateRequest(allReq);
DistSchedAllocateResponse dsAllocResp =
new DistSchedAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
- ((AllocateRequestPBImpl)allReq).getProto()));
+ distAllReq.getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
@@ -243,8 +250,8 @@ public AllocateResponse allocate(AllocateRequest request) throws
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
- AllocateRequest request) throws YarnException, IOException {
- List askList = request.getAskList();
+ DistSchedAllocateRequest request) throws YarnException, IOException {
+ List askList = request.getAllocateRequest().getAskList();
Assert.assertEquals(1, askList.size());
Assert.assertTrue(askList.get(0)
.getExecutionTypeRequest().getEnforceExecutionType());