diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 1a0f30a..4b62358 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; @@ -339,4 +340,17 @@ public static ExecutionTypeRequest convertFromProtoFormat( ExecutionTypeRequestProto e) { return new ExecutionTypeRequestPBImpl(e); } + + /* + * Container + */ + public static YarnProtos.ContainerProto convertToProtoFormat( + Container t) { + return ((ContainerPBImpl)t).getProto(); + } + + public static ContainerStatusPBImpl convertFromProtoFormat( + YarnProtos.ContainerStatusProto p) { + return new ContainerStatusPBImpl(p); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java index 490c25b..26faa8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java @@ -22,7 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; @@ -74,5 +74,5 @@ DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling( @Unstable @Idempotent DistSchedAllocateResponse allocateForDistributedScheduling( - AllocateRequest request) throws YarnException, IOException; + DistSchedAllocateRequest request) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java index c1dd9e5..713ee28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java @@ -22,9 +22,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -35,6 +37,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; + +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb @@ -77,9 +81,9 @@ public void close() { @Override public DistSchedRegisterResponse - registerApplicationMasterForDistributedScheduling - (RegisterApplicationMasterRequest request) throws YarnException, - IOException { + registerApplicationMasterForDistributedScheduling( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto = ((RegisterApplicationMasterRequestPBImpl) request).getProto(); try { @@ -93,10 +97,10 @@ public void close() { } @Override - public DistSchedAllocateResponse allocateForDistributedScheduling - (AllocateRequest request) throws YarnException, IOException { - YarnServiceProtos.AllocateRequestProto requestProto = - ((AllocateRequestPBImpl) request).getProto(); + public DistSchedAllocateResponse allocateForDistributedScheduling( + DistSchedAllocateRequest request) throws YarnException, IOException { + YarnServerCommonServiceProtos.DistSchedAllocateRequestProto requestProto = + ((DistSchedAllocateRequestPBImpl) request).getProto(); try { return new DistSchedAllocateResponsePBImpl( proxy.allocateForDistributedScheduling(null, requestProto)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java index 8be2893..2763259 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb @@ -77,8 +78,10 @@ public DistributedSchedulerProtocolPBServiceImpl( @Override public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto allocateForDistributedScheduling(RpcController controller, - AllocateRequestProto proto) throws ServiceException { - AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto); + YarnServerCommonServiceProtos.DistSchedAllocateRequestProto proto) + throws ServiceException { + DistSchedAllocateRequestPBImpl request = + new DistSchedAllocateRequestPBImpl(proto); try { DistSchedAllocateResponse response = real .allocateForDistributedScheduling(request); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java new file mode 100644 index 0000000..afac25e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateRequest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.records.Container; + +import java.util.List; + +@Public +@Evolving +public abstract class DistSchedAllocateRequest { + + /** + * Get the underlying AllocateRequest object. + * @return Allocate request + */ + @Public + @Evolving + public abstract AllocateRequest getAllocateRequest(); + + /** + * Set the underlying AllocateRequest object. + * @param allocateRequest Allocate request + */ + @Public + @Evolving + public abstract void setAllocateRequest(AllocateRequest allocateRequest); + + /** + * Get the list of newly allocated Container by the + * Distributed Scheduling component on the NodeManager. + * @return list of newly allocated Container + */ + @Public + @Evolving + public abstract List getAllocatedContainers(); + + /** + * Set the list of newly allocated Container by the + * Distributed Scheduling component on the NodeManager. + * @param containers list of newly allocated Container + */ + @Public + @Evolving + public abstract void setAllocatedContainers(List containers); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java new file mode 100644 index 0000000..b809b76 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateRequestPBImpl.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import java.util.ArrayList; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.DistSchedAllocateRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest; + +import java.util.Iterator; +import java.util.List; + +public class DistSchedAllocateRequestPBImpl extends DistSchedAllocateRequest { + private DistSchedAllocateRequestProto.Builder builder = null; + private boolean viaProto = false; + + private DistSchedAllocateRequestProto proto; + private AllocateRequest allocateRequest; + private List containers; + + public DistSchedAllocateRequestPBImpl() { + builder = DistSchedAllocateRequestProto.newBuilder(); + } + + public DistSchedAllocateRequestPBImpl(DistSchedAllocateRequestProto proto) { + this.proto = proto; + this.viaProto = true; + } + + @Override + public AllocateRequest getAllocateRequest() { + DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.allocateRequest != null) { + return this.allocateRequest; + } + if (!p.hasAllocateRequest()) { + return null; + } + this.allocateRequest = convertFromProtoFormat(p.getAllocateRequest()); + return this.allocateRequest; + } + + @Override + public void setAllocateRequest(AllocateRequest allocateRequest) { + maybeInitBuilder(); + if (allocateRequest == null) { + builder.clearAllocateRequest(); + } + this.allocateRequest = allocateRequest; + } + + @Override + public List getAllocatedContainers() { + if (this.containers != null) { + return this.containers; + } + initAllocatedContainers(); + return containers; + } + + private void initAllocatedContainers() { + DistSchedAllocateRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getAllocatedContainersList(); + this.containers = new ArrayList(); + for (ContainerProto c : list) { + this.containers.add(convertFromProtoFormat(c)); + } + } + + @Override + public void setAllocatedContainers(List containers) { + maybeInitBuilder(); + if (containers == null || containers.isEmpty()) { + if (this.containers != null) { + this.containers.clear(); + } + builder.clearAllocatedContainers(); + return; + } + this.containers = new ArrayList<>(); + this.containers.addAll(containers); + } + + public DistSchedAllocateRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = DistSchedAllocateRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (this.containers != null) { + builder.clearAllocatedContainers(); + Iterable iterable = + getContainerProtoIterable(this.containers); + builder.addAllAllocatedContainers(iterable); + } + if (this.allocateRequest != null) { + builder.setAllocateRequest( + ((AllocateRequestPBImpl)this.allocateRequest).getProto()); + } + } + + private Iterable getContainerProtoIterable( + final List newContainersList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + Iterator iter = newContainersList.iterator(); + + @Override + public synchronized boolean hasNext() { + return iter.hasNext(); + } + + @Override + public synchronized ContainerProto next() { + return ProtoUtils.convertToProtoFormat(iter.next()); + } + + @Override + public synchronized void remove() { + throw new UnsupportedOperationException(); + + } + }; + } + }; + } + + private ContainerPBImpl convertFromProtoFormat(ContainerProto p) { + return new ContainerPBImpl(p); + } + + private AllocateRequestPBImpl convertFromProtoFormat(AllocateRequestProto p) { + return new AllocateRequestPBImpl(p); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto index b94656c..818eb4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto @@ -35,5 +35,5 @@ import "yarn_server_common_service_protos.proto"; service DistributedSchedulerProtocolService { rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto); rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto); - rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto); + rpc allocateForDistributedScheduling (DistSchedAllocateRequestProto) returns (DistSchedAllocateResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index a7e5a86..3e3cb82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -41,6 +41,11 @@ message DistSchedAllocateResponseProto { repeated NodeIdProto nodes_for_scheduling = 2; } +message DistSchedAllocateRequestProto { + optional AllocateRequestProto allocate_request = 1; + repeated ContainerProto allocated_containers = 2; +} + message NodeLabelsProto { repeated NodeLabelProto nodeLabels = 1; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java index 2b2a2f6..55c65f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java @@ -21,10 +21,10 @@ import org.apache.hadoop.conf.Configuration; import com.google.common.base.Preconditions; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords .RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; @@ -118,8 +118,8 @@ public AMRMProxyApplicationContext getApplicationContext() { * @throws IOException */ @Override - public DistSchedAllocateResponse allocateForDistributedScheduling - (AllocateRequest request) throws YarnException, IOException { + public DistSchedAllocateResponse allocateForDistributedScheduling( + DistSchedAllocateRequest request) throws YarnException, IOException { return (this.nextInterceptor != null) ? this.nextInterceptor.allocateForDistributedScheduling(request) : null; } @@ -135,9 +135,9 @@ public AMRMProxyApplicationContext getApplicationContext() { */ @Override public DistSchedRegisterResponse - registerApplicationMasterForDistributedScheduling - (RegisterApplicationMasterRequest request) throws YarnException, - IOException { + registerApplicationMasterForDistributedScheduling( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { return (this.nextInterceptor != null) ? this.nextInterceptor .registerApplicationMasterForDistributedScheduling(request) : null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java index 1637682..debff76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.ServerRMProxy; import org.apache.hadoop.yarn.server.api.protocolrecords .DistSchedAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,8 +134,8 @@ public AllocateResponse allocate(final AllocateRequest request) } @Override - public DistSchedAllocateResponse allocateForDistributedScheduling - (AllocateRequest request) throws YarnException, IOException { + public DistSchedAllocateResponse allocateForDistributedScheduling( + DistSchedAllocateRequest request) throws YarnException, IOException { if (LOG.isDebugEnabled()) { LOG.debug("Forwarding allocateForDistributedScheduling request" + "to the real YARN RM"); @@ -212,9 +213,9 @@ public AllocateResponse allocate(AllocateRequest request) throws } @Override - public DistSchedAllocateResponse - allocateForDistributedScheduling(AllocateRequest request) throws - YarnException, IOException { + public DistSchedAllocateResponse allocateForDistributedScheduling( + DistSchedAllocateRequest request) + throws YarnException, IOException { throw new IOException("Not Supported !!"); } }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java index 8e2ceb0..a88b9f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java @@ -21,6 +21,9 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; import org.apache.hadoop.yarn.api.protocolrecords @@ -99,6 +102,9 @@ private static final Logger LOG = LoggerFactory .getLogger(LocalScheduler.class); + private final static RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + // Currently just used to keep track of allocated Containers // Can be used for reporting stats later private Set containersAllocated = new HashSet<>(); @@ -176,7 +182,10 @@ void initLocal(ApplicationAttemptId applicationAttemptId, @Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { - return allocateForDistributedScheduling(request).getAllocateResponse(); + DistSchedAllocateRequest distRequest = + recordFactory.newRecordInstance(DistSchedAllocateRequest.class); + distRequest.setAllocateRequest(request); + return allocateForDistributedScheduling(distRequest).getAllocateResponse(); } @Override @@ -324,9 +333,9 @@ private void addToNodeList(List nodes) { @Override public DistSchedRegisterResponse - registerApplicationMasterForDistributedScheduling - (RegisterApplicationMasterRequest request) throws YarnException, - IOException { + registerApplicationMasterForDistributedScheduling( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { LOG.info("Forwarding registration request to the" + "Distributed Scheduler Service on YARN RM"); DistSchedRegisterResponse dsResp = getNextInterceptor() @@ -336,17 +345,18 @@ private void addToNodeList(List nodes) { } @Override - public DistSchedAllocateResponse allocateForDistributedScheduling - (AllocateRequest request) throws YarnException, IOException { + public DistSchedAllocateResponse allocateForDistributedScheduling( + DistSchedAllocateRequest request) throws YarnException, IOException { if (LOG.isDebugEnabled()) { LOG.debug("Forwarding allocate request to the" + "Distributed Scheduler Service on YARN RM"); } // Partition requests into GUARANTEED and OPPORTUNISTIC reqs - PartitionedResourceRequests partitionedAsks = partitionAskList(request - .getAskList()); + PartitionedResourceRequests partitionedAsks = partitionAskList( + request.getAllocateRequest().getAskList()); - List releasedContainers = request.getReleaseList(); + List releasedContainers = + request.getAllocateRequest().getReleaseList(); int numReleasedContainers = releasedContainers.size(); if (numReleasedContainers > 0) { LOG.info("AttemptID: " + applicationAttemptId + " released: " @@ -355,7 +365,8 @@ private void addToNodeList(List nodes) { } // Also, update black list - ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); + ResourceBlacklistRequest rbr = + request.getAllocateRequest().getResourceBlacklistRequest(); if (rbr != null) { blacklist.removeAll(rbr.getBlacklistRemovals()); blacklist.addAll(rbr.getBlacklistAdditions()); @@ -381,9 +392,10 @@ private void addToNodeList(List nodes) { allocatedContainers.addAll(e.getValue()); } } + request.setAllocatedContainers(allocatedContainers); // Send all the GUARANTEED Reqs to RM - request.setAskList(partitionedAsks.getGuaranteed()); + request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed()); DistSchedAllocateResponse dsResp = getNextInterceptor().allocateForDistributedScheduling(request); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java index a1d39f7..31f8085 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; import org.apache.hadoop.yarn.api.protocolrecords @@ -126,7 +127,7 @@ public void setBytes(ByteBuffer bytes) {} Mockito.when( finalReqIntcptr.allocateForDistributedScheduling( - Mockito.any(AllocateRequest.class))) + Mockito.any(DistSchedAllocateRequest.class))) .thenAnswer(new Answer() { @Override public DistSchedAllocateResponse answer(InvocationOnMock diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java index a93f683..5aabddc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java @@ -24,6 +24,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; @@ -32,6 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -45,6 +47,12 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor; @@ -60,6 +68,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -229,9 +238,23 @@ public AllocateResponse allocate(AllocateRequest request) throws } @Override - public DistSchedAllocateResponse allocateForDistributedScheduling - (AllocateRequest request) throws YarnException, IOException { - AllocateResponse response = allocate(request); + public DistSchedAllocateResponse allocateForDistributedScheduling( + DistSchedAllocateRequest request) throws YarnException, IOException { + List distAllocContainers = request.getAllocatedContainers(); + for (Container container : distAllocContainers) { + // Create RMContainer + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler) rmContext.getScheduler()) + .getCurrentAttemptForContainer(container.getId()); + RMContainer rmContainer = new RMContainerImpl(container, + appAttempt.getApplicationAttemptId(), container.getNodeId(), + appAttempt.getUser(), rmContext, true); + appAttempt.addRMContainer(container.getId(), rmContainer); + rmContainer.handle( + new RMContainerEvent(container.getId(), + RMContainerEventType.LAUNCHED)); + } + AllocateResponse response = allocate(request.getAllocateRequest()); DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance (DistSchedAllocateResponse.class); dsResp.setAllocateResponse(response); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index f37923f..a1826bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -91,4 +92,8 @@ void cancelIncreaseReservation(); String getQueueName(); + + ExecutionType getExecutionType(); + + boolean isExternallyAllocated(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 95f81d4..21bf124 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -79,6 +80,8 @@ RMContainerEventType.KILL) .addTransition(RMContainerState.NEW, RMContainerState.RESERVED, RMContainerEventType.RESERVED, new ContainerReservedTransition()) + .addTransition(RMContainerState.NEW, RMContainerState.RUNNING, + RMContainerEventType.LAUNCHED) .addTransition(RMContainerState.NEW, EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED), RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) @@ -183,6 +186,8 @@ private Resource lastConfirmedResource; private volatile String queueName; + private boolean isExternallyAllocated; + public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext) { @@ -190,6 +195,13 @@ public RMContainerImpl(Container container, .currentTimeMillis(), ""); } + public RMContainerImpl(Container container, + ApplicationAttemptId appAttemptId, NodeId nodeId, String user, + RMContext rmContext, boolean isExternallyAllocated) { + this(container, appAttemptId, nodeId, user, rmContext, System + .currentTimeMillis(), "", isExternallyAllocated); + } + private boolean saveNonAMContainerMetaInfo; public RMContainerImpl(Container container, @@ -202,6 +214,14 @@ public RMContainerImpl(Container container, public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext, long creationTime, String nodeLabelExpression) { + this(container, appAttemptId, nodeId, user, rmContext, creationTime, + nodeLabelExpression, false); + } + + public RMContainerImpl(Container container, + ApplicationAttemptId appAttemptId, NodeId nodeId, String user, + RMContext rmContext, long creationTime, String nodeLabelExpression, + boolean isExternallyAllocated) { this.stateMachine = stateMachineFactory.make(this); this.containerId = container.getId(); this.nodeId = nodeId; @@ -216,6 +236,7 @@ public RMContainerImpl(Container container, this.resourceRequests = null; this.nodeLabelExpression = nodeLabelExpression; this.lastConfirmedResource = container.getResource(); + this.isExternallyAllocated = isExternallyAllocated; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); @@ -827,4 +848,14 @@ public void setQueueName(String queueName) { public String getQueueName() { return queueName; } + + @Override + public ExecutionType getExecutionType() { + return container.getExecutionType(); + } + + @Override + public boolean isExternallyAllocated() { + return isExternallyAllocated; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 3066339..31ba72e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -516,7 +516,23 @@ public void completedContainer(RMContainer rmContainer, return; } - completedContainerInternal(rmContainer, containerStatus, event); + if (!rmContainer.isExternallyAllocated()) { + completedContainerInternal(rmContainer, containerStatus, event); + } else { + ContainerId containerId = rmContainer.getContainerId(); + // Inform the container + rmContainer.handle( + new RMContainerFinishedEvent(containerId, containerStatus, event)); + SchedulerApplicationAttempt schedulerAttempt = + getCurrentAttemptForContainer(containerId); + if (schedulerAttempt != null) { + schedulerAttempt.removeRMContainer(containerId); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Completed container: " + rmContainer.getContainerId() + + " in state: " + rmContainer.getState() + " event:" + event); + } + } // If the container is getting killed in ACQUIRED state, the requester (AM // for regular containers and RM itself for AM container) will not know what diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index b48b272..9726b1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -112,6 +112,8 @@ private boolean isAttemptRecovering; protected ResourceUsage attemptResourceUsage = new ResourceUsage(); + protected ResourceUsage attemptResourceUsageOpportunistic = + new ResourceUsage(); private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); private AtomicLong firstContainerAllocatedTime = new AtomicLong(0); @@ -288,6 +290,23 @@ public synchronized RMContainer getRMContainer(ContainerId id) { return liveContainers.get(id); } + public synchronized void addRMContainer( + ContainerId id, RMContainer rmContainer) { + liveContainers.put(id, rmContainer); + if (rmContainer.isExternallyAllocated()) { + this.attemptResourceUsageOpportunistic.incUsed( + rmContainer.getAllocatedResource()); + } + } + + public synchronized void removeRMContainer(ContainerId containerId) { + RMContainer rmContainer = liveContainers.remove(containerId); + if (rmContainer != null && rmContainer.isExternallyAllocated()) { + this.attemptResourceUsageOpportunistic.decUsed( + rmContainer.getAllocatedResource()); + } + } + protected synchronized void resetReReservations(Priority priority) { reReservations.setCount(priority, 0); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java index 7d2ed33..4716bab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java @@ -44,15 +44,17 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateRequestPBImpl; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -175,10 +177,15 @@ public Configuration getYarnConfiguration() { Assert.assertEquals(2, dsRegResp.getIncrAllocatableCapabilty().getVirtualCores()); + DistSchedAllocateRequestPBImpl distAllReq = + (DistSchedAllocateRequestPBImpl)factory.newRecordInstance( + DistSchedAllocateRequest.class); + distAllReq.setAllocateRequest(allReq); + distAllReq.setAllocatedContainers(Arrays.asList(c)); DistSchedAllocateResponse dsAllocResp = new DistSchedAllocateResponsePBImpl( dsProxy.allocateForDistributedScheduling(null, - ((AllocateRequestPBImpl)allReq).getProto())); + distAllReq.getProto())); Assert.assertEquals( "h1", dsAllocResp.getNodesForScheduling().get(0).getHost()); @@ -243,8 +250,13 @@ public AllocateResponse allocate(AllocateRequest request) throws @Override public DistSchedAllocateResponse allocateForDistributedScheduling( - AllocateRequest request) throws YarnException, IOException { - List askList = request.getAskList(); + DistSchedAllocateRequest request) throws YarnException, IOException { + List askList = + request.getAllocateRequest().getAskList(); + List allocatedContainers = request.getAllocatedContainers(); + Assert.assertEquals(1, allocatedContainers.size()); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + allocatedContainers.get(0).getExecutionType()); Assert.assertEquals(1, askList.size()); Assert.assertTrue(askList.get(0) .getExecutionTypeRequest().getEnforceExecutionType());