diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index cfe2897..5f90224 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -296,53 +296,60 @@ public static boolean isAclEnabled(Configuration conf) { /** ACL used in case none is found. Allows nothing. */ public static final String DEFAULT_YARN_APP_ACL = " "; - /** Is Distributed Scheduling Enabled. */ + /** Setting that controls whether distributed scheduling is enabled or not. */ public static final String DIST_SCHEDULING_ENABLED = YARN_PREFIX + "distributed-scheduling.enabled"; public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false; - /** Mininum allocatable container memory for Distributed Scheduling. */ - public static final String DIST_SCHEDULING_MIN_MEMORY = - YARN_PREFIX + "distributed-scheduling.min-memory"; - public static final int DIST_SCHEDULING_MIN_MEMORY_DEFAULT = 512; - - /** Mininum allocatable container vcores for Distributed Scheduling. */ - public static final String DIST_SCHEDULING_MIN_VCORES = - YARN_PREFIX + "distributed-scheduling.min-vcores"; - public static final int DIST_SCHEDULING_MIN_VCORES_DEFAULT = 1; - - /** Maximum allocatable container memory for Distributed Scheduling. */ - public static final String DIST_SCHEDULING_MAX_MEMORY = - YARN_PREFIX + "distributed-scheduling.max-memory"; - public static final int DIST_SCHEDULING_MAX_MEMORY_DEFAULT = 2048; - - /** Maximum allocatable container vcores for Distributed Scheduling. */ - public static final String DIST_SCHEDULING_MAX_VCORES = - YARN_PREFIX + "distributed-scheduling.max-vcores"; - public static final int DIST_SCHEDULING_MAX_VCORES_DEFAULT = 4; - - /** Incremental allocatable container memory for Distributed Scheduling. */ - public static final String DIST_SCHEDULING_INCR_MEMORY = - YARN_PREFIX + "distributed-scheduling.incr-memory"; - public static final int DIST_SCHEDULING_INCR_MEMORY_DEFAULT = 512; - - /** Incremental allocatable container vcores for Distributed Scheduling. */ - public static final String DIST_SCHEDULING_INCR_VCORES = + /** Minimum memory (in MB) used for allocating a container through distributed + * scheduling. */ + public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB = + YARN_PREFIX + "distributed-scheduling.min-container-memory-mb"; + public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512; + + /** Minimum virtual CPU cores used for allocating a container through + * distributed scheduling. */ + public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES = + YARN_PREFIX + "distributed-scheduling.min-container-vcores"; + public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1; + + /** Maximum memory (in MB) used for allocating a container through distributed + * scheduling. */ + public static final String DIST_SCHEDULING_MAX_MEMORY_MB = + YARN_PREFIX + "distributed-scheduling.max-container-memory-mb"; + public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048; + + /** Maximum virtual CPU cores used for allocating a container through + * distributed scheduling. */ + public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES = + YARN_PREFIX + "distributed-scheduling.max-container-vcores"; + public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4; + + /** Incremental memory (in MB) used for allocating a container through + * distributed scheduling. */ + public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB = + YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb"; + public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT = + 512; + + /** Incremental virtual CPU cores used for allocating a container through + * distributed scheduling. */ + public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES = YARN_PREFIX + "distributed-scheduling.incr-vcores"; - public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1; + public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1; - /** Container token expiry for container allocated via Distributed - * Scheduling. */ + /** Container token expiry for container allocated via distributed + * scheduling. */ public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS = - YARN_PREFIX + "distributed-scheduling.container-token-expiry"; + YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms"; public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT = 600000; - /** K least loaded nodes to be provided to the LocalScheduler of a - * NodeManager for Distributed Scheduling. */ - public static final String DIST_SCHEDULING_TOP_K = - YARN_PREFIX + "distributed-scheduling.top-k"; - public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10; + /** Number of nodes to be used by the LocalScheduler of a NodeManager for + * dispatching containers during distributed scheduling. */ + public static final String DIST_SCHEDULING_NODES_NUMBER_USED = + YARN_PREFIX + "distributed-scheduling.nodes-used"; + public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10; /** Frequency for computing least loaded NMs. */ public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS = @@ -373,13 +380,13 @@ public static boolean isAclEnabled(Configuration conf) { YARN_PREFIX + "nm-container-queuing.max-queue-length"; public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10; - /** Min wait time of container queue at NodeManager. */ + /** Min queue wait time for a container at a NodeManager. */ public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS = YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms"; public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT = 1; - /** Max wait time of container queue at NodeManager. */ + /** Max queue wait time for a container queue at a NodeManager. */ public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS = YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms"; public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT = @@ -1624,17 +1631,21 @@ public static boolean isAclEnabled(Configuration conf) { public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX + "application.classpath"; + /** The setting that controls whether AMRMProxy is enabled or not. */ public static final String AMRM_PROXY_ENABLED = NM_PREFIX - + "amrmproxy.enable"; + + "amrmproxy.enabled"; public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false; + public static final String AMRM_PROXY_ADDRESS = NM_PREFIX + "amrmproxy.address"; public static final int DEFAULT_AMRM_PROXY_PORT = 8048; public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:" + DEFAULT_AMRM_PROXY_PORT; + public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX + "amrmproxy.client.thread-count"; public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25; + public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = NM_PREFIX + "amrmproxy.interceptor-class.pipeline"; public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 61b698d..a2c7eeb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -119,21 +119,21 @@ public void initializeMemberVariables() { configurationPrefixToSkipCompare .add(YarnConfiguration.DIST_SCHEDULING_ENABLED); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY); + .add(YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_INCR_VCORES); + .add(YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY); + .add(YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_MAX_VCORES); + .add(YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES); configurationPrefixToSkipCompare .add(YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY); + .add(YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_MIN_VCORES); + .add(YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_TOP_K); + .add(YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED); configurationPrefixToSkipCompare .add(YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS); configurationPrefixToSkipCompare diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java index 490c25b..8489834 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java @@ -23,8 +23,8 @@ import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -55,7 +55,7 @@ @Public @Unstable @Idempotent - DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling( + DistributedSchedulerRegisterResponse registerApplicationMasterForDistributedScheduling( RegisterApplicationMasterRequest request) throws YarnException, IOException; @@ -73,6 +73,6 @@ DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling( @Public @Unstable @Idempotent - DistSchedAllocateResponse allocateForDistributedScheduling( + DistributedSchedulerAllocateResponse allocateForDistributedScheduling( AllocateRequest request) throws YarnException, IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java index c1dd9e5..563ccf7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java @@ -25,26 +25,20 @@ import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; - - -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb - .FinishApplicationMasterRequestPBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb - .FinishApplicationMasterResponsePBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb - .RegisterApplicationMasterRequestPBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb - .RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulerAllocateResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulerRegisterResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.proto.YarnServiceProtos; @@ -76,14 +70,14 @@ public void close() { } @Override - public DistSchedRegisterResponse + public DistributedSchedulerRegisterResponse registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequest request) throws YarnException, IOException { YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto = ((RegisterApplicationMasterRequestPBImpl) request).getProto(); try { - return new DistSchedRegisterResponsePBImpl( + return new DistributedSchedulerRegisterResponsePBImpl( proxy.registerApplicationMasterForDistributedScheduling( null, requestProto)); } catch (ServiceException e) { @@ -93,12 +87,12 @@ public void close() { } @Override - public DistSchedAllocateResponse allocateForDistributedScheduling + public DistributedSchedulerAllocateResponse allocateForDistributedScheduling (AllocateRequest request) throws YarnException, IOException { YarnServiceProtos.AllocateRequestProto requestProto = ((AllocateRequestPBImpl) request).getProto(); try { - return new DistSchedAllocateResponsePBImpl( + return new DistributedSchedulerAllocateResponsePBImpl( proxy.allocateForDistributedScheduling(null, requestProto)); } catch (ServiceException e) { RPCUtil.unwrapAndThrowException(e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java index 8be2893..1ecdad0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java @@ -24,15 +24,15 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse; import org.apache.hadoop.yarn.api.protocolrecords .FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulerAllocateResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulerRegisterResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb .FinishApplicationMasterRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb @@ -57,16 +57,16 @@ public DistributedSchedulerProtocolPBServiceImpl( } @Override - public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto + public YarnServerCommonServiceProtos.DistributedSchedulerRegisterResponseProto registerApplicationMasterForDistributedScheduling(RpcController controller, RegisterApplicationMasterRequestProto proto) throws ServiceException { RegisterApplicationMasterRequestPBImpl request = new RegisterApplicationMasterRequestPBImpl(proto); try { - DistSchedRegisterResponse response = + DistributedSchedulerRegisterResponse response = real.registerApplicationMasterForDistributedScheduling(request); - return ((DistSchedRegisterResponsePBImpl) response).getProto(); + return ((DistributedSchedulerRegisterResponsePBImpl) response).getProto(); } catch (YarnException e) { throw new ServiceException(e); } catch (IOException e) { @@ -75,14 +75,14 @@ public DistributedSchedulerProtocolPBServiceImpl( } @Override - public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto + public YarnServerCommonServiceProtos.DistributedSchedulerAllocateResponseProto allocateForDistributedScheduling(RpcController controller, AllocateRequestProto proto) throws ServiceException { AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto); try { - DistSchedAllocateResponse response = real + DistributedSchedulerAllocateResponse response = real .allocateForDistributedScheduling(request); - return ((DistSchedAllocateResponsePBImpl) response).getProto(); + return ((DistributedSchedulerAllocateResponsePBImpl) response).getProto(); } catch (YarnException e) { throw new ServiceException(e); } catch (IOException e) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java deleted file mode 100644 index 5f6e069..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.api.protocolrecords; - -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.util.Records; - -import java.util.List; - -@Public -@Unstable -public abstract class DistSchedAllocateResponse { - - @Public - @Unstable - public static DistSchedAllocateResponse newInstance(AllocateResponse - allResp) { - DistSchedAllocateResponse response = - Records.newRecord(DistSchedAllocateResponse.class); - response.setAllocateResponse(allResp); - return response; - } - - @Public - @Unstable - public abstract void setAllocateResponse(AllocateResponse response); - - @Public - @Unstable - public abstract AllocateResponse getAllocateResponse(); - - @Public - @Unstable - public abstract void setNodesForScheduling(List nodesForScheduling); - - @Public - @Unstable - public abstract List getNodesForScheduling(); -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java deleted file mode 100644 index e4e5138..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.api.protocolrecords; - -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.protocolrecords - .RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.util.Records; - -import java.util.List; - -@Public -@Unstable -public abstract class DistSchedRegisterResponse { - - @Public - @Unstable - public static DistSchedRegisterResponse newInstance - (RegisterApplicationMasterResponse regAMResp) { - DistSchedRegisterResponse response = - Records.newRecord(DistSchedRegisterResponse.class); - response.setRegisterResponse(regAMResp); - return response; - } - - @Public - @Unstable - public abstract void setRegisterResponse( - RegisterApplicationMasterResponse resp); - - @Public - @Unstable - public abstract RegisterApplicationMasterResponse getRegisterResponse(); - - @Public - @Unstable - public abstract void setMinAllocatableCapabilty(Resource minResource); - - @Public - @Unstable - public abstract Resource getMinAllocatableCapabilty(); - - @Public - @Unstable - public abstract void setMaxAllocatableCapabilty(Resource maxResource); - - @Public - @Unstable - public abstract Resource getMaxAllocatableCapabilty(); - - @Public - @Unstable - public abstract void setIncrAllocatableCapabilty(Resource maxResource); - - @Public - @Unstable - public abstract Resource getIncrAllocatableCapabilty(); - - @Public - @Unstable - public abstract void setContainerTokenExpiryInterval(int interval); - - @Public - @Unstable - public abstract int getContainerTokenExpiryInterval(); - - @Public - @Unstable - public abstract void setContainerIdStart(long containerIdStart); - - @Public - @Unstable - public abstract long getContainerIdStart(); - - @Public - @Unstable - public abstract void setNodesForScheduling(List nodesForScheduling); - - @Public - @Unstable - public abstract List getNodesForScheduling(); - -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulerAllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulerAllocateResponse.java new file mode 100644 index 0000000..29fd9e3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulerAllocateResponse.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; + +@Public +@Unstable +public abstract class DistributedSchedulerAllocateResponse { + + @Public + @Unstable + public static DistributedSchedulerAllocateResponse newInstance( + AllocateResponse allResp) { + DistributedSchedulerAllocateResponse response = + Records.newRecord(DistributedSchedulerAllocateResponse.class); + response.setAllocateResponse(allResp); + return response; + } + + @Public + @Unstable + public abstract void setAllocateResponse(AllocateResponse response); + + @Public + @Unstable + public abstract AllocateResponse getAllocateResponse(); + + @Public + @Unstable + public abstract void setNodesForScheduling(List nodesForScheduling); + + @Public + @Unstable + public abstract List getNodesForScheduling(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulerRegisterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulerRegisterResponse.java new file mode 100644 index 0000000..1db5167 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulerRegisterResponse.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords + .RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; + +@Public +@Unstable +public abstract class DistributedSchedulerRegisterResponse { + + @Public + @Unstable + public static DistributedSchedulerRegisterResponse newInstance + (RegisterApplicationMasterResponse regAMResp) { + DistributedSchedulerRegisterResponse response = + Records.newRecord(DistributedSchedulerRegisterResponse.class); + response.setRegisterResponse(regAMResp); + return response; + } + + @Public + @Unstable + public abstract void setRegisterResponse( + RegisterApplicationMasterResponse resp); + + @Public + @Unstable + public abstract RegisterApplicationMasterResponse getRegisterResponse(); + + @Public + @Unstable + public abstract void setMinContainerResource(Resource minResource); + + @Public + @Unstable + public abstract Resource getMinContainerResource(); + + @Public + @Unstable + public abstract void setMaxContainerResource(Resource maxResource); + + @Public + @Unstable + public abstract Resource getMaxContainerResource(); + + @Public + @Unstable + public abstract void setIncrContainerResource(Resource maxResource); + + @Public + @Unstable + public abstract Resource getIncrContainerResource(); + + @Public + @Unstable + public abstract void setContainerTokenExpiryInterval(int interval); + + @Public + @Unstable + public abstract int getContainerTokenExpiryInterval(); + + @Public + @Unstable + public abstract void setContainerIdStart(long containerIdStart); + + @Public + @Unstable + public abstract long getContainerIdStart(); + + @Public + @Unstable + public abstract void setNodesForScheduling(List nodesForScheduling); + + @Public + @Unstable + public abstract List getNodesForScheduling(); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java deleted file mode 100644 index 3ea4965..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; - -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; - -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb - .AllocateResponsePBImpl; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; -import org.apache.hadoop.yarn.proto.YarnProtos; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; -import org.apache.hadoop.yarn.proto.YarnServiceProtos; -import org.apache.hadoop.yarn.server.api.protocolrecords - .DistSchedAllocateResponse; - - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse { - - YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto = - YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.getDefaultInstance(); - YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.Builder builder = null; - boolean viaProto = false; - - private AllocateResponse allocateResponse; - private List nodesForScheduling; - - public DistSchedAllocateResponsePBImpl() { - builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder(); - } - - public DistSchedAllocateResponsePBImpl(YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto) { - this.proto = proto; - viaProto = true; - } - - public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto getProto() { - mergeLocalToProto(); - proto = viaProto ? proto : builder.build(); - viaProto = true; - return proto; - } - - private void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder(proto); - } - viaProto = false; - } - - private synchronized void mergeLocalToProto() { - if (viaProto) - maybeInitBuilder(); - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } - - private synchronized void mergeLocalToBuilder() { - if (this.nodesForScheduling != null) { - builder.clearNodesForScheduling(); - Iterable iterable = - getNodeIdProtoIterable(this.nodesForScheduling); - builder.addAllNodesForScheduling(iterable); - } - if (this.allocateResponse != null) { - builder.setAllocateResponse( - ((AllocateResponsePBImpl)this.allocateResponse).getProto()); - } - } - @Override - public void setAllocateResponse(AllocateResponse response) { - maybeInitBuilder(); - if(allocateResponse == null) { - builder.clearAllocateResponse(); - } - this.allocateResponse = response; - } - - @Override - public AllocateResponse getAllocateResponse() { - if (this.allocateResponse != null) { - return this.allocateResponse; - } - - YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p = - viaProto ? proto : builder; - if (!p.hasAllocateResponse()) { - return null; - } - - this.allocateResponse = - new AllocateResponsePBImpl(p.getAllocateResponse()); - return this.allocateResponse; - } - - @Override - public void setNodesForScheduling(List nodesForScheduling) { - maybeInitBuilder(); - if (nodesForScheduling == null || nodesForScheduling.isEmpty()) { - if (this.nodesForScheduling != null) { - this.nodesForScheduling.clear(); - } - builder.clearNodesForScheduling(); - return; - } - this.nodesForScheduling = new ArrayList<>(); - this.nodesForScheduling.addAll(nodesForScheduling); - } - - @Override - public List getNodesForScheduling() { - if (nodesForScheduling != null) { - return nodesForScheduling; - } - initLocalNodesForSchedulingList(); - return nodesForScheduling; - } - - private synchronized void initLocalNodesForSchedulingList() { - YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p = - viaProto ? proto : builder; - List list = p.getNodesForSchedulingList(); - nodesForScheduling = new ArrayList<>(); - if (list != null) { - for (YarnProtos.NodeIdProto t : list) { - nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t)); - } - } - } - private synchronized Iterable getNodeIdProtoIterable( - final List nodeList) { - maybeInitBuilder(); - return new Iterable() { - @Override - public synchronized Iterator iterator() { - return new Iterator() { - - Iterator iter = nodeList.iterator(); - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public YarnProtos.NodeIdProto next() { - return ProtoUtils.convertToProtoFormat(iter.next()); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - }; - } - -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java deleted file mode 100644 index 0322c70..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java +++ /dev/null @@ -1,304 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; - -import com.google.protobuf.TextFormat; - -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - - -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb - .RegisterApplicationMasterResponsePBImpl; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; -import org.apache.hadoop.yarn.proto.YarnProtos; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; -import org.apache.hadoop.yarn.server.api.protocolrecords - .DistSchedRegisterResponse; - - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse { - - YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto = - YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.getDefaultInstance(); - YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.Builder builder = null; - boolean viaProto = false; - - private Resource maxAllocatableCapability; - private Resource minAllocatableCapability; - private Resource incrAllocatableCapability; - private List nodesForScheduling; - private RegisterApplicationMasterResponse registerApplicationMasterResponse; - - public DistSchedRegisterResponsePBImpl() { - builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder(); - } - - public DistSchedRegisterResponsePBImpl(YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto) { - this.proto = proto; - viaProto = true; - } - - public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto getProto() { - mergeLocalToProto(); - proto = viaProto ? proto : builder.build(); - viaProto = true; - return proto; - } - - private void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder(proto); - } - viaProto = false; - } - - private synchronized void mergeLocalToProto() { - if (viaProto) - maybeInitBuilder(); - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } - - private synchronized void mergeLocalToBuilder() { - if (this.nodesForScheduling != null) { - builder.clearNodesForScheduling(); - Iterable iterable = - getNodeIdProtoIterable(this.nodesForScheduling); - builder.addAllNodesForScheduling(iterable); - } - if (this.maxAllocatableCapability != null) { - builder.setMaxAllocCapability( - ProtoUtils.convertToProtoFormat(this.maxAllocatableCapability)); - } - if (this.minAllocatableCapability != null) { - builder.setMaxAllocCapability( - ProtoUtils.convertToProtoFormat(this.minAllocatableCapability)); - } - if (this.registerApplicationMasterResponse != null) { - builder.setRegisterResponse( - ((RegisterApplicationMasterResponsePBImpl) - this.registerApplicationMasterResponse).getProto()); - } - } - - @Override - public void setRegisterResponse(RegisterApplicationMasterResponse resp) { - maybeInitBuilder(); - if(registerApplicationMasterResponse == null) { - builder.clearRegisterResponse(); - } - this.registerApplicationMasterResponse = resp; - } - - @Override - public RegisterApplicationMasterResponse getRegisterResponse() { - if (this.registerApplicationMasterResponse != null) { - return this.registerApplicationMasterResponse; - } - - YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasRegisterResponse()) { - return null; - } - - this.registerApplicationMasterResponse = - new RegisterApplicationMasterResponsePBImpl(p.getRegisterResponse()); - return this.registerApplicationMasterResponse; - } - - @Override - public void setMaxAllocatableCapabilty(Resource maxResource) { - maybeInitBuilder(); - if(maxAllocatableCapability == null) { - builder.clearMaxAllocCapability(); - } - this.maxAllocatableCapability = maxResource; - } - - @Override - public Resource getMaxAllocatableCapabilty() { - if (this.maxAllocatableCapability != null) { - return this.maxAllocatableCapability; - } - - YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasMaxAllocCapability()) { - return null; - } - - this.maxAllocatableCapability = - ProtoUtils.convertFromProtoFormat(p.getMaxAllocCapability()); - return this.maxAllocatableCapability; - } - - @Override - public void setMinAllocatableCapabilty(Resource minResource) { - maybeInitBuilder(); - if(minAllocatableCapability == null) { - builder.clearMinAllocCapability(); - } - this.minAllocatableCapability = minResource; - } - - @Override - public Resource getMinAllocatableCapabilty() { - if (this.minAllocatableCapability != null) { - return this.minAllocatableCapability; - } - - YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasMinAllocCapability()) { - return null; - } - - this.minAllocatableCapability = - ProtoUtils.convertFromProtoFormat(p.getMinAllocCapability()); - return this.minAllocatableCapability; - } - - @Override - public void setIncrAllocatableCapabilty(Resource incrResource) { - maybeInitBuilder(); - if(incrAllocatableCapability == null) { - builder.clearIncrAllocCapability(); - } - this.incrAllocatableCapability = incrResource; - } - - @Override - public Resource getIncrAllocatableCapabilty() { - if (this.incrAllocatableCapability != null) { - return this.incrAllocatableCapability; - } - - YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasIncrAllocCapability()) { - return null; - } - - this.incrAllocatableCapability = - ProtoUtils.convertFromProtoFormat(p.getIncrAllocCapability()); - return this.incrAllocatableCapability; - } - - @Override - public void setContainerTokenExpiryInterval(int interval) { - maybeInitBuilder(); - builder.setContainerTokenExpiryInterval(interval); - } - - @Override - public int getContainerTokenExpiryInterval() { - YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasContainerTokenExpiryInterval()) { - return 0; - } - return p.getContainerTokenExpiryInterval(); - } - - @Override - public void setContainerIdStart(long containerIdStart) { - maybeInitBuilder(); - builder.setContainerIdStart(containerIdStart); - } - - @Override - public long getContainerIdStart() { - YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasContainerIdStart()) { - return 0; - } - return p.getContainerIdStart(); - } - - - @Override - public void setNodesForScheduling(List nodesForScheduling) { - maybeInitBuilder(); - if (nodesForScheduling == null || nodesForScheduling.isEmpty()) { - if (this.nodesForScheduling != null) { - this.nodesForScheduling.clear(); - } - builder.clearNodesForScheduling(); - return; - } - this.nodesForScheduling = new ArrayList<>(); - this.nodesForScheduling.addAll(nodesForScheduling); - } - - @Override - public List getNodesForScheduling() { - if (nodesForScheduling != null) { - return nodesForScheduling; - } - initLocalNodesForSchedulingList(); - return nodesForScheduling; - } - - private synchronized void initLocalNodesForSchedulingList() { - YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getNodesForSchedulingList(); - nodesForScheduling = new ArrayList<>(); - if (list != null) { - for (YarnProtos.NodeIdProto t : list) { - nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t)); - } - } - } - private synchronized Iterable getNodeIdProtoIterable( - final List nodeList) { - maybeInitBuilder(); - return new Iterable() { - @Override - public synchronized Iterator iterator() { - return new Iterator() { - - Iterator iter = nodeList.iterator(); - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public YarnProtos.NodeIdProto next() { - return ProtoUtils.convertToProtoFormat(iter.next()); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - }; - } - - @Override - public String toString() { - return TextFormat.shortDebugString(getProto()); - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulerAllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulerAllocateResponsePBImpl.java new file mode 100644 index 0000000..6e0b810 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulerAllocateResponsePBImpl.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; + +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class DistributedSchedulerAllocateResponsePBImpl extends + DistributedSchedulerAllocateResponse { + + YarnServerCommonServiceProtos.DistributedSchedulerAllocateResponseProto + proto = YarnServerCommonServiceProtos. + DistributedSchedulerAllocateResponseProto.getDefaultInstance(); + YarnServerCommonServiceProtos.DistributedSchedulerAllocateResponseProto. + Builder builder = null; + boolean viaProto = false; + + private AllocateResponse allocateResponse; + private List nodesForScheduling; + + public DistributedSchedulerAllocateResponsePBImpl() { + builder = YarnServerCommonServiceProtos. + DistributedSchedulerAllocateResponseProto.newBuilder(); + } + + public DistributedSchedulerAllocateResponsePBImpl( + YarnServerCommonServiceProtos. + DistributedSchedulerAllocateResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public YarnServerCommonServiceProtos.DistributedSchedulerAllocateResponseProto + getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = YarnServerCommonServiceProtos. + DistributedSchedulerAllocateResponseProto.newBuilder(proto); + } + viaProto = false; + } + + private synchronized void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void mergeLocalToBuilder() { + if (this.nodesForScheduling != null) { + builder.clearNodesForScheduling(); + Iterable iterable = getNodeIdProtoIterable( + this.nodesForScheduling); + builder.addAllNodesForScheduling(iterable); + } + if (this.allocateResponse != null) { + builder.setAllocateResponse( + ((AllocateResponsePBImpl) this.allocateResponse).getProto()); + } + } + + @Override + public void setAllocateResponse(AllocateResponse response) { + maybeInitBuilder(); + if (allocateResponse == null) { + builder.clearAllocateResponse(); + } + this.allocateResponse = response; + } + + @Override + public AllocateResponse getAllocateResponse() { + if (this.allocateResponse != null) { + return this.allocateResponse; + } + + YarnServerCommonServiceProtos. + DistributedSchedulerAllocateResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasAllocateResponse()) { + return null; + } + + this.allocateResponse = new AllocateResponsePBImpl(p.getAllocateResponse()); + return this.allocateResponse; + } + + @Override + public void setNodesForScheduling(List nodesForScheduling) { + maybeInitBuilder(); + if (nodesForScheduling == null || nodesForScheduling.isEmpty()) { + if (this.nodesForScheduling != null) { + this.nodesForScheduling.clear(); + } + builder.clearNodesForScheduling(); + return; + } + this.nodesForScheduling = new ArrayList<>(); + this.nodesForScheduling.addAll(nodesForScheduling); + } + + @Override + public List getNodesForScheduling() { + if (nodesForScheduling != null) { + return nodesForScheduling; + } + initLocalNodesForSchedulingList(); + return nodesForScheduling; + } + + private synchronized void initLocalNodesForSchedulingList() { + YarnServerCommonServiceProtos. + DistributedSchedulerAllocateResponseProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getNodesForSchedulingList(); + nodesForScheduling = new ArrayList<>(); + if (list != null) { + for (YarnProtos.NodeIdProto t : list) { + nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t)); + } + } + } + + private synchronized Iterable getNodeIdProtoIterable( + final List nodeList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = nodeList.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public YarnProtos.NodeIdProto next() { + return ProtoUtils.convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulerRegisterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulerRegisterResponsePBImpl.java new file mode 100644 index 0000000..5504650 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulerRegisterResponsePBImpl.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import com.google.protobuf.TextFormat; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class DistributedSchedulerRegisterResponsePBImpl extends + DistributedSchedulerRegisterResponse { + + YarnServerCommonServiceProtos.DistributedSchedulerRegisterResponseProto + proto = + YarnServerCommonServiceProtos.DistributedSchedulerRegisterResponseProto + .getDefaultInstance(); + YarnServerCommonServiceProtos. + DistributedSchedulerRegisterResponseProto.Builder builder = null; + boolean viaProto = false; + + private Resource maxContainerResource; + private Resource minContainerRequest; + private Resource incrContainerRequest; + private List nodesForScheduling; + private RegisterApplicationMasterResponse registerApplicationMasterResponse; + + public DistributedSchedulerRegisterResponsePBImpl() { + builder = YarnServerCommonServiceProtos. + DistributedSchedulerRegisterResponseProto.newBuilder(); + } + + public DistributedSchedulerRegisterResponsePBImpl( + YarnServerCommonServiceProtos. + DistributedSchedulerRegisterResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public YarnServerCommonServiceProtos.DistributedSchedulerRegisterResponseProto + getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = YarnServerCommonServiceProtos. + DistributedSchedulerRegisterResponseProto.newBuilder(proto); + } + viaProto = false; + } + + private synchronized void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void mergeLocalToBuilder() { + if (this.nodesForScheduling != null) { + builder.clearNodesForScheduling(); + Iterable iterable = getNodeIdProtoIterable( + this.nodesForScheduling); + builder.addAllNodesForScheduling(iterable); + } + if (this.maxContainerResource != null) { + builder.setMaxContainerResource(ProtoUtils.convertToProtoFormat( + this.maxContainerResource)); + } + if (this.minContainerRequest != null) { + builder.setMinContainerResource(ProtoUtils.convertToProtoFormat( + this.minContainerRequest)); + } + if (this.registerApplicationMasterResponse != null) { + builder.setRegisterResponse( + ((RegisterApplicationMasterResponsePBImpl) + this.registerApplicationMasterResponse).getProto()); + } + } + + @Override + public void setRegisterResponse(RegisterApplicationMasterResponse resp) { + maybeInitBuilder(); + if (registerApplicationMasterResponse == null) { + builder.clearRegisterResponse(); + } + this.registerApplicationMasterResponse = resp; + } + + @Override + public RegisterApplicationMasterResponse getRegisterResponse() { + if (this.registerApplicationMasterResponse != null) { + return this.registerApplicationMasterResponse; + } + + YarnServerCommonServiceProtos. + DistributedSchedulerRegisterResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasRegisterResponse()) { + return null; + } + + this.registerApplicationMasterResponse = + new RegisterApplicationMasterResponsePBImpl(p.getRegisterResponse()); + return this.registerApplicationMasterResponse; + } + + @Override + public void setMaxContainerResource(Resource maxResource) { + maybeInitBuilder(); + if (maxContainerResource == null) { + builder.clearMaxContainerResource(); + } + this.maxContainerResource = maxResource; + } + + @Override + public Resource getMaxContainerResource() { + if (this.maxContainerResource != null) { + return this.maxContainerResource; + } + + YarnServerCommonServiceProtos. + DistributedSchedulerRegisterResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasMaxContainerResource()) { + return null; + } + + this.maxContainerResource = ProtoUtils.convertFromProtoFormat(p + .getMaxContainerResource()); + return this.maxContainerResource; + } + + @Override + public void setMinContainerResource(Resource minResource) { + maybeInitBuilder(); + if (minContainerRequest == null) { + builder.clearMinContainerResource(); + } + this.minContainerRequest = minResource; + } + + @Override + public Resource getMinContainerResource() { + if (this.minContainerRequest != null) { + return this.minContainerRequest; + } + + YarnServerCommonServiceProtos. + DistributedSchedulerRegisterResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasMinContainerResource()) { + return null; + } + + this.minContainerRequest = ProtoUtils.convertFromProtoFormat(p + .getMinContainerResource()); + return this.minContainerRequest; + } + + @Override + public void setIncrContainerResource(Resource incrResource) { + maybeInitBuilder(); + if (incrContainerRequest == null) { + builder.clearIncrContainerResource(); + } + this.incrContainerRequest = incrResource; + } + + @Override + public Resource getIncrContainerResource() { + if (this.incrContainerRequest != null) { + return this.incrContainerRequest; + } + + YarnServerCommonServiceProtos. + DistributedSchedulerRegisterResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasIncrContainerResource()) { + return null; + } + + this.incrContainerRequest = ProtoUtils.convertFromProtoFormat(p + .getIncrContainerResource()); + return this.incrContainerRequest; + } + + @Override + public void setContainerTokenExpiryInterval(int interval) { + maybeInitBuilder(); + builder.setContainerTokenExpiryInterval(interval); + } + + @Override + public int getContainerTokenExpiryInterval() { + YarnServerCommonServiceProtos. + DistributedSchedulerRegisterResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasContainerTokenExpiryInterval()) { + return 0; + } + return p.getContainerTokenExpiryInterval(); + } + + @Override + public void setContainerIdStart(long containerIdStart) { + maybeInitBuilder(); + builder.setContainerIdStart(containerIdStart); + } + + @Override + public long getContainerIdStart() { + YarnServerCommonServiceProtos. + DistributedSchedulerRegisterResponseProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasContainerIdStart()) { + return 0; + } + return p.getContainerIdStart(); + } + + @Override + public void setNodesForScheduling(List nodesForScheduling) { + maybeInitBuilder(); + if (nodesForScheduling == null || nodesForScheduling.isEmpty()) { + if (this.nodesForScheduling != null) { + this.nodesForScheduling.clear(); + } + builder.clearNodesForScheduling(); + return; + } + this.nodesForScheduling = new ArrayList<>(); + this.nodesForScheduling.addAll(nodesForScheduling); + } + + @Override + public List getNodesForScheduling() { + if (nodesForScheduling != null) { + return nodesForScheduling; + } + initLocalNodesForSchedulingList(); + return nodesForScheduling; + } + + private synchronized void initLocalNodesForSchedulingList() { + YarnServerCommonServiceProtos. + DistributedSchedulerRegisterResponseProtoOrBuilder p = + viaProto ? proto : builder; + List list = p.getNodesForSchedulingList(); + nodesForScheduling = new ArrayList<>(); + if (list != null) { + for (YarnProtos.NodeIdProto t : list) { + nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t)); + } + } + } + + private synchronized Iterable getNodeIdProtoIterable( + final List nodeList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = nodeList.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public YarnProtos.NodeIdProto next() { + return ProtoUtils.convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java index a7f0ece..fb567d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java @@ -41,5 +41,5 @@ public static QueuedContainersStatus newInstance() { public abstract int getWaitQueueLength(); - public abstract void setWaitQueueLength(int queueWaitTime); + public abstract void setWaitQueueLength(int waitQueueLength); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto index 7e3a77f..4622a53 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto @@ -33,6 +33,6 @@ import "yarn_server_common_service_protos.proto"; service DistributedSchedulerProtocolService { - rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto); - rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto); + rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistributedSchedulerRegisterResponseProto); + rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistributedSchedulerAllocateResponseProto); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index a7e5a86..c05f243 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -26,17 +26,17 @@ import "yarn_protos.proto"; import "yarn_server_common_protos.proto"; import "yarn_service_protos.proto"; -message DistSchedRegisterResponseProto { +message DistributedSchedulerRegisterResponseProto { optional RegisterApplicationMasterResponseProto register_response = 1; - optional ResourceProto max_alloc_capability = 2; - optional ResourceProto min_alloc_capability = 3; - optional ResourceProto incr_alloc_capability = 4; + optional ResourceProto max_container_resource = 2; + optional ResourceProto min_container_resource = 3; + optional ResourceProto incr_container_resource = 4; optional int32 container_token_expiry_interval = 5; optional int64 container_id_start = 6; repeated NodeIdProto nodes_for_scheduling = 7; } -message DistSchedAllocateResponseProto { +message DistributedSchedulerAllocateResponseProto { optional AllocateResponseProto allocate_response = 1; repeated NodeIdProto nodes_for_scheduling = 2; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index ac360f4..e1abe81 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -467,8 +467,7 @@ protected RequestInterceptor createRequestInterceptorChain() { interceptorClassNames.add(item.trim()); } - // Make sure LocalScheduler is present at the beginning - // of the chain.. + // Make sure LocalScheduler is present at the beginning of the chain. if (this.nmContext.isDistributedSchedulingEnabled()) { interceptorClassNames.add(0, LocalScheduler.class.getName()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java index 2b2a2f6..ce7fb5b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java @@ -25,8 +25,8 @@ import org.apache.hadoop.yarn.api.protocolrecords .RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse; import java.io.IOException; @@ -118,7 +118,7 @@ public AMRMProxyApplicationContext getApplicationContext() { * @throws IOException */ @Override - public DistSchedAllocateResponse allocateForDistributedScheduling + public DistributedSchedulerAllocateResponse allocateForDistributedScheduling (AllocateRequest request) throws YarnException, IOException { return (this.nextInterceptor != null) ? this.nextInterceptor.allocateForDistributedScheduling(request) : null; @@ -134,7 +134,7 @@ public AMRMProxyApplicationContext getApplicationContext() { * @throws IOException */ @Override - public DistSchedRegisterResponse + public DistributedSchedulerRegisterResponse registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequest request) throws YarnException, IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java index 1637682..494da5d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -45,9 +45,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; import org.apache.hadoop.yarn.server.api.ServerRMProxy; -import org.apache.hadoop.yarn.server.api.protocolrecords - .DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,7 +122,7 @@ public AllocateResponse allocate(final AllocateRequest request) } @Override - public DistSchedRegisterResponse + public DistributedSchedulerRegisterResponse registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequest request) throws YarnException, IOException { @@ -133,13 +132,13 @@ public AllocateResponse allocate(final AllocateRequest request) } @Override - public DistSchedAllocateResponse allocateForDistributedScheduling + public DistributedSchedulerAllocateResponse allocateForDistributedScheduling (AllocateRequest request) throws YarnException, IOException { if (LOG.isDebugEnabled()) { LOG.debug("Forwarding allocateForDistributedScheduling request" + "to the real YARN RM"); } - DistSchedAllocateResponse allocateResponse = + DistributedSchedulerAllocateResponse allocateResponse = rmClient.allocateForDistributedScheduling(request); if (allocateResponse.getAllocateResponse().getAMRMToken() != null) { updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken()); @@ -204,7 +203,7 @@ public AllocateResponse allocate(AllocateRequest request) throws } @Override - public DistSchedRegisterResponse + public DistributedSchedulerRegisterResponse registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequest request) throws YarnException, IOException { @@ -212,7 +211,7 @@ public AllocateResponse allocate(AllocateRequest request) throws } @Override - public DistSchedAllocateResponse + public DistributedSchedulerAllocateResponse allocateForDistributedScheduling(AllocateRequest request) throws YarnException, IOException { throw new IOException("Not Supported !!"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index 4f9d5a3..707051f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -200,6 +200,7 @@ private void startAllocatedContainer( .getContainerTokenIdentifier().getContainerID(); this.context.getQueuingContext().getQueuedContainers().remove(containerId); try { + LOG.info("Starting container [" + containerId + "]"); super.startContainerInternal( allocatedContainerInfo.getContainerTokenIdentifier(), allocatedContainerInfo.getStartRequest()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java index 42c1dcd..4fee58c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java @@ -21,8 +21,8 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse; import org.apache.hadoop.yarn.api.protocolrecords .FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords @@ -64,16 +64,16 @@ import java.util.TreeMap; /** - *

The LocalScheduler runs on the NodeManager and is modelled as an + *

The LocalScheduler runs on the NodeManager and is modeled as an * AMRMProxy request interceptor. It is responsible for the - * following :

+ * following:

*
    *
  • Intercept ApplicationMasterProtocol calls and unwrap the * response objects to extract instructions from the - * ClusterManager running on the ResourceManager to aid in making - * Scheduling scheduling decisions
  • + * ClusterMonitor running on the ResourceManager to aid in making + * distributed scheduling decisions. *
  • Call the OpportunisticContainerAllocator to allocate - * containers for the opportunistic resource outstandingOpReqs
  • + * containers for the outstanding OPPORTUNISTIC container requests. *
*/ public final class LocalScheduler extends AbstractRequestInterceptor { @@ -89,7 +89,7 @@ } } - static class DistSchedulerParams { + static class DistributedSchedulerParams { Resource maxResource; Resource minResource; Resource incrementResource; @@ -99,13 +99,15 @@ private static final Logger LOG = LoggerFactory .getLogger(LocalScheduler.class); - // Currently just used to keep track of allocated Containers - // Can be used for reporting stats later + // Currently just used to keep track of allocated containers. + // Can be used for reporting stats later. private Set containersAllocated = new HashSet<>(); - private DistSchedulerParams appParams = new DistSchedulerParams(); - private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter = - new OpportunisticContainerAllocator.ContainerIdCounter(); + private DistributedSchedulerParams appParams = + new DistributedSchedulerParams(); + private final OpportunisticContainerAllocator.ContainerIdCounter + containerIdCounter = + new OpportunisticContainerAllocator.ContainerIdCounter(); private Map nodeList = new HashMap<>(); // Mapping of NodeId to NodeTokens. Populated either from RM response or @@ -116,7 +118,7 @@ // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority, // Resource Name (Host/rack/any) and capability. This mapping is required // to match a received Container to an outstanding OPPORTUNISTIC - // ResourceRequests (ask) + // ResourceRequest (ask). final TreeMap> outstandingOpReqs = new TreeMap<>(); @@ -189,9 +191,6 @@ public AllocateResponse allocate(AllocateRequest request) throws /** * Check if we already have a NMToken. if Not, generate the Token and * add it to the response - * @param response - * @param nmTokens - * @param allocatedContainers */ private void updateResponseWithNMTokens(AllocateResponse response, List nmTokens, List allocatedContainers) { @@ -224,11 +223,11 @@ private PartitionedResourceRequests partitionAskList(List } private void updateParameters( - DistSchedRegisterResponse registerResponse) { - appParams.minResource = registerResponse.getMinAllocatableCapabilty(); - appParams.maxResource = registerResponse.getMaxAllocatableCapabilty(); + DistributedSchedulerRegisterResponse registerResponse) { + appParams.minResource = registerResponse.getMinContainerResource(); + appParams.maxResource = registerResponse.getMaxContainerResource(); appParams.incrementResource = - registerResponse.getIncrAllocatableCapabilty(); + registerResponse.getIncrContainerResource(); if (appParams.incrementResource == null) { appParams.incrementResource = appParams.minResource; } @@ -242,11 +241,10 @@ private void updateParameters( /** * Takes a list of ResourceRequests (asks), extracts the key information viz. - * (Priority, ResourceName, Capability) and adds it the outstanding + * (Priority, ResourceName, Capability) and adds to the outstanding * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce * the current YARN constraint that only a single ResourceRequest can exist at - * a give Priority and Capability - * @param resourceAsks + * a give Priority and Capability. */ public void addToOutstandingReqs(List resourceAsks) { for (ResourceRequest request : resourceAsks) { @@ -286,9 +284,7 @@ public void addToOutstandingReqs(List resourceAsks) { /** * This method matches a returned list of Container Allocations to any - * outstanding OPPORTUNISTIC ResourceRequest - * @param capability - * @param allocatedContainers + * outstanding OPPORTUNISTIC ResourceRequest. */ public void matchAllocationToOutstandingRequest(Resource capability, List allocatedContainers) { @@ -322,23 +318,25 @@ private void addToNodeList(List nodes) { } @Override - public DistSchedRegisterResponse - registerApplicationMasterForDistributedScheduling - (RegisterApplicationMasterRequest request) throws YarnException, - IOException { + public DistributedSchedulerRegisterResponse + registerApplicationMasterForDistributedScheduling( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { LOG.info("Forwarding registration request to the" + - "Distributed Scheduler Service on YARN RM"); - DistSchedRegisterResponse dsResp = getNextInterceptor() + "Distributed Scheduling Service on YARN RM"); + DistributedSchedulerRegisterResponse dsResp = getNextInterceptor() .registerApplicationMasterForDistributedScheduling(request); updateParameters(dsResp); return dsResp; } @Override - public DistSchedAllocateResponse allocateForDistributedScheduling + public DistributedSchedulerAllocateResponse allocateForDistributedScheduling (AllocateRequest request) throws YarnException, IOException { - LOG.info("Forwarding allocate request to the" + - "Distributed Scheduler Service on YARN RM"); + if (LOG.isDebugEnabled()) { + LOG.debug("Forwarding allocate request to the" + + "Distributed Scheduler Service on YARN RM"); + } // Partition requests into GUARANTEED and OPPORTUNISTIC reqs PartitionedResourceRequests partitionedAsks = partitionAskList(request .getAskList()); @@ -381,7 +379,7 @@ private void addToNodeList(List nodes) { // Send all the GUARANTEED Reqs to RM request.setAskList(partitionedAsks.getGuaranteed()); - DistSchedAllocateResponse dsResp = + DistributedSchedulerAllocateResponse dsResp = getNextInterceptor().allocateForDistributedScheduling(request); // Update host to nodeId mapping diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java index 03ba61d..2982cbc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistSchedulerParams; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistributedSchedulerParams; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -40,12 +40,14 @@ import java.util.concurrent.atomic.AtomicLong; /** - *

The OpportunisticContainerAllocator allocates containers on a given list - * of Nodes after it modifies the container sizes to within allowable limits - * specified by the ClusterManager running on the RM. It tries to - * distribute the containers as evenly as possible. It also uses the + *

+ * The OpportunisticContainerAllocator allocates containers on a given list of + * nodes, after modifying the container sizes to respect the limits set by the + * ClusterManager running on the RM. It tries to distribute the + * containers as evenly as possible. It also uses the * NMTokenSecretManagerInNM to generate the required NM tokens for - * the allocated containers

+ * the allocated containers. + *

*/ public class OpportunisticContainerAllocator { @@ -78,32 +80,34 @@ public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater, this.webpagePort = webpagePort; } - public Map> allocate(DistSchedulerParams appParams, - ContainerIdCounter idCounter, Collection resourceAsks, - Set blacklist, ApplicationAttemptId appAttId, - Map allNodes, String userName) throws YarnException { + public Map> allocate( + DistributedSchedulerParams appParams, ContainerIdCounter idCounter, + Collection resourceAsks, Set blacklist, + ApplicationAttemptId appAttId, Map allNodes, + String userName) throws YarnException { Map> containers = new HashMap<>(); Set nodesAllocated = new HashSet<>(); - int numAsks = resourceAsks.size(); for (ResourceRequest anyAsk : resourceAsks) { allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId, allNodes, userName, containers, nodesAllocated, anyAsk); - } - if (numAsks > 0) { - LOG.info("Opportunistic allocation requested for: " + numAsks - + " containers; allocated = " + containers.size()); + LOG.info("Opportunistic allocation requested for [" + + "priority=" + anyAsk.getPriority() + + ", num_containers=" + anyAsk.getNumContainers() + + ", capability=" + anyAsk.getCapability() + "]" + + " allocated = " + containers.get(anyAsk.getCapability()).size()); } return containers; } - private void allocateOpportunisticContainers(DistSchedulerParams appParams, - ContainerIdCounter idCounter, Set blacklist, - ApplicationAttemptId id, Map allNodes, String userName, + private void allocateOpportunisticContainers( + DistributedSchedulerParams appParams, ContainerIdCounter idCounter, + Set blacklist, ApplicationAttemptId id, + Map allNodes, String userName, Map> containers, Set nodesAllocated, ResourceRequest anyAsk) throws YarnException { int toAllocate = anyAsk.getNumContainers() - - (containers.isEmpty() ? - 0 : containers.get(anyAsk.getCapability()).size()); + - (containers.isEmpty() ? 0 : + containers.get(anyAsk.getCapability()).size()); List topKNodesLeft = new ArrayList<>(); for (String s : allNodes.keySet()) { @@ -129,11 +133,12 @@ private void allocateOpportunisticContainers(DistSchedulerParams appParams, } cList.add(container); numAllocated++; - LOG.info("Allocated " + numAllocated + " opportunistic containers."); + LOG.info("Allocated [" + container.getId() + "] as opportunistic."); } + LOG.info("Allocated " + numAllocated + " opportunistic containers."); } - private Container buildContainer(DistSchedulerParams appParams, + private Container buildContainer(DistributedSchedulerParams appParams, ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id, String userName, NodeId nodeId) throws YarnException { ContainerId cId = @@ -145,12 +150,13 @@ private Container buildContainer(DistSchedulerParams appParams, long currTime = System.currentTimeMillis(); ContainerTokenIdentifier containerTokenIdentifier = - new ContainerTokenIdentifier( - cId, nodeId.getHost(), userName, capability, - currTime + appParams.containerTokenExpiryInterval, - context.getContainerTokenSecretManager().getCurrentKey().getKeyId(), - nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime, - null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, + new ContainerTokenIdentifier(cId, nodeId.getHost() + ":" + nodeId + .getPort(), userName, capability, currTime + + appParams.containerTokenExpiryInterval, context + .getContainerTokenSecretManager().getCurrentKey() + .getKeyId(), nodeStatusUpdater.getRMIdentifier(), rr + .getPriority(), currTime, null, + CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, ExecutionType.OPPORTUNISTIC); byte[] pwd = context.getContainerTokenSecretManager().createPassword( @@ -163,7 +169,7 @@ private Container buildContainer(DistSchedulerParams appParams, return container; } - private Resource normalizeCapability(DistSchedulerParams appParams, + private Resource normalizeCapability(DistributedSchedulerParams appParams, ResourceRequest ask) { return Resources.normalize(RESOURCE_CALCULATOR, ask.getCapability(), appParams.minResource, appParams.maxResource, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java index efc682a..33034c1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java @@ -19,11 +19,12 @@ package org.apache.hadoop.yarn.server.nodemanager.scheduler; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse; import org.apache.hadoop.yarn.api.protocolrecords .RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords @@ -45,6 +46,7 @@ .NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security .NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -102,15 +104,15 @@ public void setBytes(ByteBuffer bytes) {} RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class); localScheduler.setNextInterceptor(finalReqIntcptr); - DistSchedRegisterResponse distSchedRegisterResponse = - Records.newRecord(DistSchedRegisterResponse.class); + DistributedSchedulerRegisterResponse distSchedRegisterResponse = + Records.newRecord(DistributedSchedulerRegisterResponse.class); distSchedRegisterResponse.setRegisterResponse( Records.newRecord(RegisterApplicationMasterResponse.class)); distSchedRegisterResponse.setContainerTokenExpiryInterval(12345); distSchedRegisterResponse.setContainerIdStart(0); - distSchedRegisterResponse.setMaxAllocatableCapabilty( + distSchedRegisterResponse.setMaxContainerResource( Resource.newInstance(1024, 4)); - distSchedRegisterResponse.setMinAllocatableCapabilty( + distSchedRegisterResponse.setMinContainerResource( Resource.newInstance(512, 2)); distSchedRegisterResponse.setNodesForScheduling(Arrays.asList( NodeId.newInstance("a", 1), NodeId.newInstance("b", 2))); @@ -125,9 +127,9 @@ public void setBytes(ByteBuffer bytes) {} Mockito.when( finalReqIntcptr.allocateForDistributedScheduling( Mockito.any(AllocateRequest.class))) - .thenAnswer(new Answer() { + .thenAnswer(new Answer() { @Override - public DistSchedAllocateResponse answer(InvocationOnMock + public DistributedSchedulerAllocateResponse answer(InvocationOnMock invocationOnMock) throws Throwable { return createAllocateResponse(Arrays.asList( NodeId.newInstance("c", 3), NodeId.newInstance("d", 4))); @@ -186,9 +188,9 @@ public DistSchedAllocateResponse answer(InvocationOnMock Assert.assertNull(allocs.get(NodeId.newInstance("b", 2))); } - private DistSchedAllocateResponse createAllocateResponse(List nodes) { - DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord - (DistSchedAllocateResponse.class); + private DistributedSchedulerAllocateResponse createAllocateResponse(List nodes) { + DistributedSchedulerAllocateResponse distSchedAllocateResponse = Records.newRecord + (DistributedSchedulerAllocateResponse.class); distSchedAllocateResponse.setAllocateResponse( Records.newRecord(AllocateResponse.class)); distSchedAllocateResponse.setNodesForScheduling(nodes); @@ -196,9 +198,14 @@ private DistSchedAllocateResponse createAllocateResponse(List nodes) { } private Map> mapAllocs(AllocateResponse - allocateResponse) { + allocateResponse) throws Exception { Map> allocs = new HashMap<>(); for (Container c : allocateResponse.getAllocatedContainers()) { + ContainerTokenIdentifier cTokId = BuilderUtils + .newContainerTokenIdentifier(c.getContainerToken()); + Assert.assertEquals( + c.getNodeId().getHost() + ":" + c.getNodeId().getPort(), + cTokId.getNmHostAddress()); List cIds = allocs.get(c.getNodeId()); if (cIds == null) { cIds = new ArrayList<>(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 4f90fa0..ac98c57 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -91,8 +91,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.security - .AMRMTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java index a93f683..4d6301d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java @@ -32,8 +32,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; @@ -77,20 +77,20 @@ private static final Log LOG = LogFactory.getLog(DistributedSchedulingService.class); - private final NodeQueueLoadMonitor nodeMonitor; + private final NodeQueueLoadMonitor nodeQueueLoadMonitor; private final ConcurrentHashMap> rackToNode = new ConcurrentHashMap<>(); private final ConcurrentHashMap> hostToNode = new ConcurrentHashMap<>(); - private final int k; + private final int nodesNo; public DistributedSchedulingService(RMContext rmContext, YarnScheduler scheduler) { super(DistributedSchedulingService.class.getName(), rmContext, scheduler); - this.k = rmContext.getYarnConfiguration().getInt( - YarnConfiguration.DIST_SCHEDULING_TOP_K, - YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT); + this.nodesNo = rmContext.getYarnConfiguration().getInt( + YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED, + YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT); long nodeSortInterval = rmContext.getYarnConfiguration().getLong( YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, YarnConfiguration. @@ -134,7 +134,7 @@ public DistributedSchedulingService(RMContext rmContext, } topKSelector.initThresholdCalculator(sigma, limitMin, limitMax); - this.nodeMonitor = topKSelector; + this.nodeQueueLoadMonitor = topKSelector; } @Override @@ -144,8 +144,8 @@ public Server getServer(YarnRPC rpc, Configuration serverConf, addr, serverConf, secretManager, serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); - // To support application running on NMs that DO NOT support - // Dist Scheduling... The server multiplexes both the + // To support applications running on NMs that DO NOT support + // Distributed Scheduling... The server multiplexes both the // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, ApplicationMasterProtocolPB.class, @@ -175,43 +175,45 @@ public AllocateResponse allocate(AllocateRequest request) throws } @Override - public DistSchedRegisterResponse + public DistributedSchedulerRegisterResponse registerApplicationMasterForDistributedScheduling( RegisterApplicationMasterRequest request) throws YarnException, IOException { RegisterApplicationMasterResponse response = registerApplicationMaster(request); - DistSchedRegisterResponse dsResp = recordFactory - .newRecordInstance(DistSchedRegisterResponse.class); + DistributedSchedulerRegisterResponse dsResp = recordFactory + .newRecordInstance(DistributedSchedulerRegisterResponse.class); dsResp.setRegisterResponse(response); - dsResp.setMinAllocatableCapabilty( + dsResp.setMinContainerResource( Resource.newInstance( getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY, - YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT), + YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB, + YarnConfiguration. + DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT), getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MIN_VCORES, - YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT) + YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES, + YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT) ) ); - dsResp.setMaxAllocatableCapabilty( + dsResp.setMaxContainerResource( Resource.newInstance( getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY, - YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT), + YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB, + YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT), getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MAX_VCORES, - YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT) + YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES, + YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT) ) ); - dsResp.setIncrAllocatableCapabilty( + dsResp.setIncrContainerResource( Resource.newInstance( getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY, - YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT), + YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB, + YarnConfiguration. + DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT), getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_INCR_VCORES, - YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT) + YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES, + YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT) ) ); dsResp.setContainerTokenExpiryInterval( @@ -224,19 +226,19 @@ public AllocateResponse allocate(AllocateRequest request) throws // Set nodes to be used for scheduling dsResp.setNodesForScheduling( - this.nodeMonitor.selectLeastLoadedNodes(this.k)); + this.nodeQueueLoadMonitor.selectLeastLoadedNodes(this.nodesNo)); return dsResp; } @Override - public DistSchedAllocateResponse allocateForDistributedScheduling + public DistributedSchedulerAllocateResponse allocateForDistributedScheduling (AllocateRequest request) throws YarnException, IOException { AllocateResponse response = allocate(request); - DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance - (DistSchedAllocateResponse.class); + DistributedSchedulerAllocateResponse dsResp = recordFactory.newRecordInstance + (DistributedSchedulerAllocateResponse.class); dsResp.setAllocateResponse(response); dsResp.setNodesForScheduling( - this.nodeMonitor.selectLeastLoadedNodes(this.k)); + this.nodeQueueLoadMonitor.selectLeastLoadedNodes(this.nodesNo)); return dsResp; } @@ -269,7 +271,7 @@ public void handle(SchedulerEvent event) { throw new RuntimeException("Unexpected event type: " + event); } NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; - nodeMonitor.addNode(nodeAddedEvent.getContainerReports(), + nodeQueueLoadMonitor.addNode(nodeAddedEvent.getContainerReports(), nodeAddedEvent.getAddedRMNode()); addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), nodeAddedEvent.getAddedRMNode().getNodeID()); @@ -282,7 +284,7 @@ public void handle(SchedulerEvent event) { } NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent) event; - nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); + nodeQueueLoadMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); removeFromMapping(rackToNode, nodeRemovedEvent.getRemovedRMNode().getRackName(), nodeRemovedEvent.getRemovedRMNode().getNodeID()); @@ -296,7 +298,7 @@ public void handle(SchedulerEvent event) { } NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent) event; - nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode()); + nodeQueueLoadMonitor.updateNode(nodeUpdatedEvent.getRMNode()); break; case NODE_RESOURCE_UPDATE: if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { @@ -304,7 +306,8 @@ public void handle(SchedulerEvent event) { } NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = (NodeResourceUpdateSchedulerEvent) event; - nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), + nodeQueueLoadMonitor.updateNodeResource( + nodeResourceUpdatedEvent.getRMNode(), nodeResourceUpdatedEvent.getResourceOption()); break; @@ -330,6 +333,6 @@ public void handle(SchedulerEvent event) { } public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { - return nodeMonitor.getThresholdCalculator(); + return nodeQueueLoadMonitor.getThresholdCalculator(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index f9d3325..423e296 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1147,7 +1147,7 @@ protected ApplicationMasterService createApplicationMasterService() { EventDispatcher distSchedulerEventDispatcher = new EventDispatcher(distributedSchedulingService, DistributedSchedulingService.class.getName()); - // Add an event dispoatcher for the DistributedSchedulingService + // Add an event dispatcher for the DistributedSchedulingService // to handle node updates/additions and removals. // Since the SchedulerEvent is currently a super set of theses, // we register interest for it.. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 463bebd..5952cc2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -93,8 +93,7 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, this.queue = queue; this.user = user; this.activeUsersManager = activeUsersManager; - this.containerIdCounter = - new AtomicLong(epoch << EPOCH_BIT_SHIFT); + this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT); this.appResourceUsage = appResourceUsage; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index 21f4f6e..017a256 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -195,11 +195,11 @@ public void updateNode(RMNode rmNode) { new ClusterNode(rmNode.getNodeID()) .setQueueWaitTime(estimatedQueueWaitTime) .setQueueLength(waitQueueLength)); - LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" + + LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + estimatedQueueWaitTime + "] and " + "wait queue length [" + waitQueueLength + "]"); } else { - LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" + + LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + estimatedQueueWaitTime + "] and " + "wait queue length [" + waitQueueLength + "]"); } @@ -210,12 +210,14 @@ public void updateNode(RMNode rmNode) { .setQueueWaitTime(estimatedQueueWaitTime) .setQueueLength(waitQueueLength) .updateTimestamp(); - LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating ClusterNode [" + rmNode.getNodeID() + "] " + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } } else { this.clusterNodes.remove(rmNode.getNodeID()); - LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + + LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + currentNode.queueWaitTime + "] and " + "wait queue length [" + currentNode.queueLength + "]"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java index 49bf4d0..418d7ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java @@ -25,21 +25,16 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb - .AllocateResponsePBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb - .FinishApplicationMasterRequestPBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb - .FinishApplicationMasterResponsePBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb - .RegisterApplicationMasterRequestPBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb - .RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; @@ -52,12 +47,9 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb - .DistSchedAllocateResponsePBImpl; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb - .DistSchedRegisterResponsePBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt - .AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulerAllocateResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulerRegisterResponsePBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.junit.Assert; import org.junit.Test; @@ -125,21 +117,21 @@ public AllocateResponse allocate(AllocateRequest request) throws } @Override - public DistSchedRegisterResponse + public DistributedSchedulerRegisterResponse registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequest request) throws YarnException, IOException { - DistSchedRegisterResponse resp = factory.newRecordInstance( - DistSchedRegisterResponse.class); + DistributedSchedulerRegisterResponse resp = factory.newRecordInstance( + DistributedSchedulerRegisterResponse.class); resp.setContainerIdStart(54321l); return resp; } @Override - public DistSchedAllocateResponse allocateForDistributedScheduling + public DistributedSchedulerAllocateResponse allocateForDistributedScheduling (AllocateRequest request) throws YarnException, IOException { - DistSchedAllocateResponse resp = - factory.newRecordInstance(DistSchedAllocateResponse.class); + DistributedSchedulerAllocateResponse resp = + factory.newRecordInstance(DistributedSchedulerAllocateResponse.class); resp.setNodesForScheduling( Arrays.asList(NodeId.newInstance("h1", 1234))); return resp; @@ -187,15 +179,15 @@ public AllocateResponse allocate(AllocateRequest request) throws RPC.getProxy(DistributedSchedulerProtocolPB .class, 1, NetUtils.getConnectAddress(server), conf); - DistSchedRegisterResponse dsRegResp = - new DistSchedRegisterResponsePBImpl( + DistributedSchedulerRegisterResponse dsRegResp = + new DistributedSchedulerRegisterResponsePBImpl( dsProxy.registerApplicationMasterForDistributedScheduling(null, ((RegisterApplicationMasterRequestPBImpl)factory .newRecordInstance(RegisterApplicationMasterRequest.class)) .getProto())); Assert.assertEquals(54321l, dsRegResp.getContainerIdStart()); - DistSchedAllocateResponse dsAllocResp = - new DistSchedAllocateResponsePBImpl( + DistributedSchedulerAllocateResponse dsAllocResp = + new DistributedSchedulerAllocateResponsePBImpl( dsProxy.allocateForDistributedScheduling(null, ((AllocateRequestPBImpl)factory .newRecordInstance(AllocateRequest.class)).getProto())); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 2372ea2..de4e22d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -75,6 +75,9 @@ import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; @@ -713,8 +716,14 @@ protected ContainerManagerImpl createContainerManager(Context context, ContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { - return new CustomContainerManagerImpl(context, exec, del, - nodeStatusUpdater, metrics, dirsHandler); + if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, + YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) { + return new CustomQueueingContainerManagerImpl(context, exec, del, + nodeStatusUpdater, metrics, dirsHandler); + } else { + return new CustomContainerManagerImpl(context, exec, del, + nodeStatusUpdater, metrics, dirsHandler); + } } } @@ -846,6 +855,55 @@ protected void createAMRMProxyService(Configuration conf) { } } + private class CustomQueueingContainerManagerImpl extends + QueuingContainerManagerImpl { + + public CustomQueueingContainerManagerImpl(Context context, + ContainerExecutor exec, DeletionService del, NodeStatusUpdater + nodeStatusUpdater, NodeManagerMetrics metrics, + LocalDirsHandlerService dirsHandler) { + super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler); + } + + @Override + protected ContainersMonitor createContainersMonitor(ContainerExecutor + exec) { + return new ContainersMonitorImpl(exec, dispatcher, this.context) { + + @Override + public void increaseContainersAllocation(ProcessTreeInfo pti) { } + + @Override + public void decreaseContainersAllocation(ProcessTreeInfo pti) { } + + @Override + public boolean hasResourcesAvailable( + ContainersMonitorImpl.ProcessTreeInfo pti) { + return true; + } + }; + } + + @Override + protected void createAMRMProxyService(Configuration conf) { + this.amrmProxyEnabled = + conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED); + + if (this.amrmProxyEnabled) { + LOG.info("CustomAMRMProxyService is enabled. " + + "All the AM->RM requests will be intercepted by the proxy"); + AMRMProxyService amrmProxyService = + useRpc ? new AMRMProxyService(getContext(), dispatcher) + : new ShortCircuitedAMRMProxy(getContext(), dispatcher); + this.setAMRMProxyService(amrmProxyService); + addService(this.getAMRMProxyService()); + } else { + LOG.info("CustomAMRMProxyService is disabled"); + } + } + } + private class ShortCircuitedAMRMProxy extends AMRMProxyService { public ShortCircuitedAMRMProxy(Context context,