diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index cfe2897..5f90224 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -296,53 +296,60 @@ public static boolean isAclEnabled(Configuration conf) {
/** ACL used in case none is found. Allows nothing. */
public static final String DEFAULT_YARN_APP_ACL = " ";
- /** Is Distributed Scheduling Enabled. */
+ /** Setting that controls whether distributed scheduling is enabled or not. */
public static final String DIST_SCHEDULING_ENABLED =
YARN_PREFIX + "distributed-scheduling.enabled";
public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;
- /** Mininum allocatable container memory for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_MIN_MEMORY =
- YARN_PREFIX + "distributed-scheduling.min-memory";
- public static final int DIST_SCHEDULING_MIN_MEMORY_DEFAULT = 512;
-
- /** Mininum allocatable container vcores for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_MIN_VCORES =
- YARN_PREFIX + "distributed-scheduling.min-vcores";
- public static final int DIST_SCHEDULING_MIN_VCORES_DEFAULT = 1;
-
- /** Maximum allocatable container memory for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_MAX_MEMORY =
- YARN_PREFIX + "distributed-scheduling.max-memory";
- public static final int DIST_SCHEDULING_MAX_MEMORY_DEFAULT = 2048;
-
- /** Maximum allocatable container vcores for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_MAX_VCORES =
- YARN_PREFIX + "distributed-scheduling.max-vcores";
- public static final int DIST_SCHEDULING_MAX_VCORES_DEFAULT = 4;
-
- /** Incremental allocatable container memory for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_INCR_MEMORY =
- YARN_PREFIX + "distributed-scheduling.incr-memory";
- public static final int DIST_SCHEDULING_INCR_MEMORY_DEFAULT = 512;
-
- /** Incremental allocatable container vcores for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_INCR_VCORES =
+ /** Minimum memory (in MB) used for allocating a container through distributed
+ * scheduling. */
+ public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB =
+ YARN_PREFIX + "distributed-scheduling.min-container-memory-mb";
+ public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512;
+
+ /** Minimum virtual CPU cores used for allocating a container through
+ * distributed scheduling. */
+ public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES =
+ YARN_PREFIX + "distributed-scheduling.min-container-vcores";
+ public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1;
+
+ /** Maximum memory (in MB) used for allocating a container through distributed
+ * scheduling. */
+ public static final String DIST_SCHEDULING_MAX_MEMORY_MB =
+ YARN_PREFIX + "distributed-scheduling.max-container-memory-mb";
+ public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048;
+
+ /** Maximum virtual CPU cores used for allocating a container through
+ * distributed scheduling. */
+ public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES =
+ YARN_PREFIX + "distributed-scheduling.max-container-vcores";
+ public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4;
+
+ /** Incremental memory (in MB) used for allocating a container through
+ * distributed scheduling. */
+ public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB =
+ YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb";
+ public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT =
+ 512;
+
+ /** Incremental virtual CPU cores used for allocating a container through
+ * distributed scheduling. */
+ public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES =
YARN_PREFIX + "distributed-scheduling.incr-vcores";
- public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1;
+ public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1;
- /** Container token expiry for container allocated via Distributed
- * Scheduling. */
+ /** Container token expiry for container allocated via distributed
+ * scheduling. */
public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
- YARN_PREFIX + "distributed-scheduling.container-token-expiry";
+ YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms";
public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
600000;
- /** K least loaded nodes to be provided to the LocalScheduler of a
- * NodeManager for Distributed Scheduling. */
- public static final String DIST_SCHEDULING_TOP_K =
- YARN_PREFIX + "distributed-scheduling.top-k";
- public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
+ /** Number of nodes to be used by the LocalScheduler of a NodeManager for
+ * dispatching containers during distributed scheduling. */
+ public static final String DIST_SCHEDULING_NODES_NUMBER_USED =
+ YARN_PREFIX + "distributed-scheduling.nodes-used";
+ public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10;
/** Frequency for computing least loaded NMs. */
public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS =
@@ -373,13 +380,13 @@ public static boolean isAclEnabled(Configuration conf) {
YARN_PREFIX + "nm-container-queuing.max-queue-length";
public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10;
- /** Min wait time of container queue at NodeManager. */
+ /** Min queue wait time for a container at a NodeManager. */
public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS =
YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms";
public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT =
1;
- /** Max wait time of container queue at NodeManager. */
+ /** Max queue wait time for a container queue at a NodeManager. */
public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS =
YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms";
public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT =
@@ -1624,17 +1631,21 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
+ "application.classpath";
+ /** The setting that controls whether AMRMProxy is enabled or not. */
public static final String AMRM_PROXY_ENABLED = NM_PREFIX
- + "amrmproxy.enable";
+ + "amrmproxy.enabled";
public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
+
public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
+ "amrmproxy.address";
public static final int DEFAULT_AMRM_PROXY_PORT = 8048;
public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:"
+ DEFAULT_AMRM_PROXY_PORT;
+
public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX
+ "amrmproxy.client.thread-count";
public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25;
+
public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
NM_PREFIX + "amrmproxy.interceptor-class.pipeline";
public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 61b698d..a2c7eeb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -119,21 +119,21 @@ public void initializeMemberVariables() {
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_ENABLED);
configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY);
+ .add(YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB);
configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_INCR_VCORES);
+ .add(YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES);
configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY);
+ .add(YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB);
configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_MAX_VCORES);
+ .add(YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS);
configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY);
+ .add(YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB);
configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_MIN_VCORES);
+ .add(YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES);
configurationPrefixToSkipCompare
- .add(YarnConfiguration.DIST_SCHEDULING_TOP_K);
+ .add(YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED);
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS);
configurationPrefixToSkipCompare
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
index 490c25b..8489834 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
@@ -23,8 +23,8 @@
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -55,7 +55,7 @@
@Public
@Unstable
@Idempotent
- DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling(
+ DistributedSchedulerRegisterResponse registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request)
throws YarnException, IOException;
@@ -73,6 +73,6 @@ DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling(
@Public
@Unstable
@Idempotent
- DistSchedAllocateResponse allocateForDistributedScheduling(
+ DistributedSchedulerAllocateResponse allocateForDistributedScheduling(
AllocateRequest request) throws YarnException, IOException;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
index c1dd9e5..563ccf7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
@@ -25,26 +25,20 @@
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
-
-
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .RegisterApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulerAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulerRegisterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
@@ -76,14 +70,14 @@ public void close() {
}
@Override
- public DistSchedRegisterResponse
+ public DistributedSchedulerRegisterResponse
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
((RegisterApplicationMasterRequestPBImpl) request).getProto();
try {
- return new DistSchedRegisterResponsePBImpl(
+ return new DistributedSchedulerRegisterResponsePBImpl(
proxy.registerApplicationMasterForDistributedScheduling(
null, requestProto));
} catch (ServiceException e) {
@@ -93,12 +87,12 @@ public void close() {
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
+ public DistributedSchedulerAllocateResponse allocateForDistributedScheduling
(AllocateRequest request) throws YarnException, IOException {
YarnServiceProtos.AllocateRequestProto requestProto =
((AllocateRequestPBImpl) request).getProto();
try {
- return new DistSchedAllocateResponsePBImpl(
+ return new DistributedSchedulerAllocateResponsePBImpl(
proxy.allocateForDistributedScheduling(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
index 8be2893..1ecdad0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
@@ -24,15 +24,15 @@
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulerAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulerRegisterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.FinishApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
@@ -57,16 +57,16 @@ public DistributedSchedulerProtocolPBServiceImpl(
}
@Override
- public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto
+ public YarnServerCommonServiceProtos.DistributedSchedulerRegisterResponseProto
registerApplicationMasterForDistributedScheduling(RpcController controller,
RegisterApplicationMasterRequestProto proto) throws
ServiceException {
RegisterApplicationMasterRequestPBImpl request = new
RegisterApplicationMasterRequestPBImpl(proto);
try {
- DistSchedRegisterResponse response =
+ DistributedSchedulerRegisterResponse response =
real.registerApplicationMasterForDistributedScheduling(request);
- return ((DistSchedRegisterResponsePBImpl) response).getProto();
+ return ((DistributedSchedulerRegisterResponsePBImpl) response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
@@ -75,14 +75,14 @@ public DistributedSchedulerProtocolPBServiceImpl(
}
@Override
- public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
+ public YarnServerCommonServiceProtos.DistributedSchedulerAllocateResponseProto
allocateForDistributedScheduling(RpcController controller,
AllocateRequestProto proto) throws ServiceException {
AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
try {
- DistSchedAllocateResponse response = real
+ DistributedSchedulerAllocateResponse response = real
.allocateForDistributedScheduling(request);
- return ((DistSchedAllocateResponsePBImpl) response).getProto();
+ return ((DistributedSchedulerAllocateResponsePBImpl) response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
deleted file mode 100644
index 5f6e069..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.api.protocolrecords;
-
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.util.Records;
-
-import java.util.List;
-
-@Public
-@Unstable
-public abstract class DistSchedAllocateResponse {
-
- @Public
- @Unstable
- public static DistSchedAllocateResponse newInstance(AllocateResponse
- allResp) {
- DistSchedAllocateResponse response =
- Records.newRecord(DistSchedAllocateResponse.class);
- response.setAllocateResponse(allResp);
- return response;
- }
-
- @Public
- @Unstable
- public abstract void setAllocateResponse(AllocateResponse response);
-
- @Public
- @Unstable
- public abstract AllocateResponse getAllocateResponse();
-
- @Public
- @Unstable
- public abstract void setNodesForScheduling(List nodesForScheduling);
-
- @Public
- @Unstable
- public abstract List getNodesForScheduling();
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
deleted file mode 100644
index e4e5138..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.api.protocolrecords;
-
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.protocolrecords
- .RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Records;
-
-import java.util.List;
-
-@Public
-@Unstable
-public abstract class DistSchedRegisterResponse {
-
- @Public
- @Unstable
- public static DistSchedRegisterResponse newInstance
- (RegisterApplicationMasterResponse regAMResp) {
- DistSchedRegisterResponse response =
- Records.newRecord(DistSchedRegisterResponse.class);
- response.setRegisterResponse(regAMResp);
- return response;
- }
-
- @Public
- @Unstable
- public abstract void setRegisterResponse(
- RegisterApplicationMasterResponse resp);
-
- @Public
- @Unstable
- public abstract RegisterApplicationMasterResponse getRegisterResponse();
-
- @Public
- @Unstable
- public abstract void setMinAllocatableCapabilty(Resource minResource);
-
- @Public
- @Unstable
- public abstract Resource getMinAllocatableCapabilty();
-
- @Public
- @Unstable
- public abstract void setMaxAllocatableCapabilty(Resource maxResource);
-
- @Public
- @Unstable
- public abstract Resource getMaxAllocatableCapabilty();
-
- @Public
- @Unstable
- public abstract void setIncrAllocatableCapabilty(Resource maxResource);
-
- @Public
- @Unstable
- public abstract Resource getIncrAllocatableCapabilty();
-
- @Public
- @Unstable
- public abstract void setContainerTokenExpiryInterval(int interval);
-
- @Public
- @Unstable
- public abstract int getContainerTokenExpiryInterval();
-
- @Public
- @Unstable
- public abstract void setContainerIdStart(long containerIdStart);
-
- @Public
- @Unstable
- public abstract long getContainerIdStart();
-
- @Public
- @Unstable
- public abstract void setNodesForScheduling(List nodesForScheduling);
-
- @Public
- @Unstable
- public abstract List getNodesForScheduling();
-
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulerAllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulerAllocateResponse.java
new file mode 100644
index 0000000..29fd9e3
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulerAllocateResponse.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+
+@Public
+@Unstable
+public abstract class DistributedSchedulerAllocateResponse {
+
+ @Public
+ @Unstable
+ public static DistributedSchedulerAllocateResponse newInstance(
+ AllocateResponse allResp) {
+ DistributedSchedulerAllocateResponse response =
+ Records.newRecord(DistributedSchedulerAllocateResponse.class);
+ response.setAllocateResponse(allResp);
+ return response;
+ }
+
+ @Public
+ @Unstable
+ public abstract void setAllocateResponse(AllocateResponse response);
+
+ @Public
+ @Unstable
+ public abstract AllocateResponse getAllocateResponse();
+
+ @Public
+ @Unstable
+ public abstract void setNodesForScheduling(List nodesForScheduling);
+
+ @Public
+ @Unstable
+ public abstract List getNodesForScheduling();
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulerRegisterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulerRegisterResponse.java
new file mode 100644
index 0000000..1db5167
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistributedSchedulerRegisterResponse.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+
+@Public
+@Unstable
+public abstract class DistributedSchedulerRegisterResponse {
+
+ @Public
+ @Unstable
+ public static DistributedSchedulerRegisterResponse newInstance
+ (RegisterApplicationMasterResponse regAMResp) {
+ DistributedSchedulerRegisterResponse response =
+ Records.newRecord(DistributedSchedulerRegisterResponse.class);
+ response.setRegisterResponse(regAMResp);
+ return response;
+ }
+
+ @Public
+ @Unstable
+ public abstract void setRegisterResponse(
+ RegisterApplicationMasterResponse resp);
+
+ @Public
+ @Unstable
+ public abstract RegisterApplicationMasterResponse getRegisterResponse();
+
+ @Public
+ @Unstable
+ public abstract void setMinContainerResource(Resource minResource);
+
+ @Public
+ @Unstable
+ public abstract Resource getMinContainerResource();
+
+ @Public
+ @Unstable
+ public abstract void setMaxContainerResource(Resource maxResource);
+
+ @Public
+ @Unstable
+ public abstract Resource getMaxContainerResource();
+
+ @Public
+ @Unstable
+ public abstract void setIncrContainerResource(Resource maxResource);
+
+ @Public
+ @Unstable
+ public abstract Resource getIncrContainerResource();
+
+ @Public
+ @Unstable
+ public abstract void setContainerTokenExpiryInterval(int interval);
+
+ @Public
+ @Unstable
+ public abstract int getContainerTokenExpiryInterval();
+
+ @Public
+ @Unstable
+ public abstract void setContainerIdStart(long containerIdStart);
+
+ @Public
+ @Unstable
+ public abstract long getContainerIdStart();
+
+ @Public
+ @Unstable
+ public abstract void setNodesForScheduling(List nodesForScheduling);
+
+ @Public
+ @Unstable
+ public abstract List getNodesForScheduling();
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
deleted file mode 100644
index 3ea4965..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
-
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .AllocateResponsePBImpl;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos;
-import org.apache.hadoop.yarn.server.api.protocolrecords
- .DistSchedAllocateResponse;
-
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse {
-
- YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto =
- YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.getDefaultInstance();
- YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.Builder builder = null;
- boolean viaProto = false;
-
- private AllocateResponse allocateResponse;
- private List nodesForScheduling;
-
- public DistSchedAllocateResponsePBImpl() {
- builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder();
- }
-
- public DistSchedAllocateResponsePBImpl(YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto) {
- this.proto = proto;
- viaProto = true;
- }
-
- public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto getProto() {
- mergeLocalToProto();
- proto = viaProto ? proto : builder.build();
- viaProto = true;
- return proto;
- }
-
- private void maybeInitBuilder() {
- if (viaProto || builder == null) {
- builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder(proto);
- }
- viaProto = false;
- }
-
- private synchronized void mergeLocalToProto() {
- if (viaProto)
- maybeInitBuilder();
- mergeLocalToBuilder();
- proto = builder.build();
- viaProto = true;
- }
-
- private synchronized void mergeLocalToBuilder() {
- if (this.nodesForScheduling != null) {
- builder.clearNodesForScheduling();
- Iterable iterable =
- getNodeIdProtoIterable(this.nodesForScheduling);
- builder.addAllNodesForScheduling(iterable);
- }
- if (this.allocateResponse != null) {
- builder.setAllocateResponse(
- ((AllocateResponsePBImpl)this.allocateResponse).getProto());
- }
- }
- @Override
- public void setAllocateResponse(AllocateResponse response) {
- maybeInitBuilder();
- if(allocateResponse == null) {
- builder.clearAllocateResponse();
- }
- this.allocateResponse = response;
- }
-
- @Override
- public AllocateResponse getAllocateResponse() {
- if (this.allocateResponse != null) {
- return this.allocateResponse;
- }
-
- YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
- viaProto ? proto : builder;
- if (!p.hasAllocateResponse()) {
- return null;
- }
-
- this.allocateResponse =
- new AllocateResponsePBImpl(p.getAllocateResponse());
- return this.allocateResponse;
- }
-
- @Override
- public void setNodesForScheduling(List nodesForScheduling) {
- maybeInitBuilder();
- if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
- if (this.nodesForScheduling != null) {
- this.nodesForScheduling.clear();
- }
- builder.clearNodesForScheduling();
- return;
- }
- this.nodesForScheduling = new ArrayList<>();
- this.nodesForScheduling.addAll(nodesForScheduling);
- }
-
- @Override
- public List getNodesForScheduling() {
- if (nodesForScheduling != null) {
- return nodesForScheduling;
- }
- initLocalNodesForSchedulingList();
- return nodesForScheduling;
- }
-
- private synchronized void initLocalNodesForSchedulingList() {
- YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
- viaProto ? proto : builder;
- List list = p.getNodesForSchedulingList();
- nodesForScheduling = new ArrayList<>();
- if (list != null) {
- for (YarnProtos.NodeIdProto t : list) {
- nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
- }
- }
- }
- private synchronized Iterable getNodeIdProtoIterable(
- final List nodeList) {
- maybeInitBuilder();
- return new Iterable() {
- @Override
- public synchronized Iterator iterator() {
- return new Iterator() {
-
- Iterator iter = nodeList.iterator();
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public YarnProtos.NodeIdProto next() {
- return ProtoUtils.convertToProtoFormat(iter.next());
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
- }
-
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
deleted file mode 100644
index 0322c70..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
-
-import com.google.protobuf.TextFormat;
-
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .RegisterApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
-import org.apache.hadoop.yarn.server.api.protocolrecords
- .DistSchedRegisterResponse;
-
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
-
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto =
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.getDefaultInstance();
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.Builder builder = null;
- boolean viaProto = false;
-
- private Resource maxAllocatableCapability;
- private Resource minAllocatableCapability;
- private Resource incrAllocatableCapability;
- private List nodesForScheduling;
- private RegisterApplicationMasterResponse registerApplicationMasterResponse;
-
- public DistSchedRegisterResponsePBImpl() {
- builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder();
- }
-
- public DistSchedRegisterResponsePBImpl(YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto) {
- this.proto = proto;
- viaProto = true;
- }
-
- public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto getProto() {
- mergeLocalToProto();
- proto = viaProto ? proto : builder.build();
- viaProto = true;
- return proto;
- }
-
- private void maybeInitBuilder() {
- if (viaProto || builder == null) {
- builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder(proto);
- }
- viaProto = false;
- }
-
- private synchronized void mergeLocalToProto() {
- if (viaProto)
- maybeInitBuilder();
- mergeLocalToBuilder();
- proto = builder.build();
- viaProto = true;
- }
-
- private synchronized void mergeLocalToBuilder() {
- if (this.nodesForScheduling != null) {
- builder.clearNodesForScheduling();
- Iterable iterable =
- getNodeIdProtoIterable(this.nodesForScheduling);
- builder.addAllNodesForScheduling(iterable);
- }
- if (this.maxAllocatableCapability != null) {
- builder.setMaxAllocCapability(
- ProtoUtils.convertToProtoFormat(this.maxAllocatableCapability));
- }
- if (this.minAllocatableCapability != null) {
- builder.setMaxAllocCapability(
- ProtoUtils.convertToProtoFormat(this.minAllocatableCapability));
- }
- if (this.registerApplicationMasterResponse != null) {
- builder.setRegisterResponse(
- ((RegisterApplicationMasterResponsePBImpl)
- this.registerApplicationMasterResponse).getProto());
- }
- }
-
- @Override
- public void setRegisterResponse(RegisterApplicationMasterResponse resp) {
- maybeInitBuilder();
- if(registerApplicationMasterResponse == null) {
- builder.clearRegisterResponse();
- }
- this.registerApplicationMasterResponse = resp;
- }
-
- @Override
- public RegisterApplicationMasterResponse getRegisterResponse() {
- if (this.registerApplicationMasterResponse != null) {
- return this.registerApplicationMasterResponse;
- }
-
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasRegisterResponse()) {
- return null;
- }
-
- this.registerApplicationMasterResponse =
- new RegisterApplicationMasterResponsePBImpl(p.getRegisterResponse());
- return this.registerApplicationMasterResponse;
- }
-
- @Override
- public void setMaxAllocatableCapabilty(Resource maxResource) {
- maybeInitBuilder();
- if(maxAllocatableCapability == null) {
- builder.clearMaxAllocCapability();
- }
- this.maxAllocatableCapability = maxResource;
- }
-
- @Override
- public Resource getMaxAllocatableCapabilty() {
- if (this.maxAllocatableCapability != null) {
- return this.maxAllocatableCapability;
- }
-
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasMaxAllocCapability()) {
- return null;
- }
-
- this.maxAllocatableCapability =
- ProtoUtils.convertFromProtoFormat(p.getMaxAllocCapability());
- return this.maxAllocatableCapability;
- }
-
- @Override
- public void setMinAllocatableCapabilty(Resource minResource) {
- maybeInitBuilder();
- if(minAllocatableCapability == null) {
- builder.clearMinAllocCapability();
- }
- this.minAllocatableCapability = minResource;
- }
-
- @Override
- public Resource getMinAllocatableCapabilty() {
- if (this.minAllocatableCapability != null) {
- return this.minAllocatableCapability;
- }
-
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasMinAllocCapability()) {
- return null;
- }
-
- this.minAllocatableCapability =
- ProtoUtils.convertFromProtoFormat(p.getMinAllocCapability());
- return this.minAllocatableCapability;
- }
-
- @Override
- public void setIncrAllocatableCapabilty(Resource incrResource) {
- maybeInitBuilder();
- if(incrAllocatableCapability == null) {
- builder.clearIncrAllocCapability();
- }
- this.incrAllocatableCapability = incrResource;
- }
-
- @Override
- public Resource getIncrAllocatableCapabilty() {
- if (this.incrAllocatableCapability != null) {
- return this.incrAllocatableCapability;
- }
-
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasIncrAllocCapability()) {
- return null;
- }
-
- this.incrAllocatableCapability =
- ProtoUtils.convertFromProtoFormat(p.getIncrAllocCapability());
- return this.incrAllocatableCapability;
- }
-
- @Override
- public void setContainerTokenExpiryInterval(int interval) {
- maybeInitBuilder();
- builder.setContainerTokenExpiryInterval(interval);
- }
-
- @Override
- public int getContainerTokenExpiryInterval() {
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasContainerTokenExpiryInterval()) {
- return 0;
- }
- return p.getContainerTokenExpiryInterval();
- }
-
- @Override
- public void setContainerIdStart(long containerIdStart) {
- maybeInitBuilder();
- builder.setContainerIdStart(containerIdStart);
- }
-
- @Override
- public long getContainerIdStart() {
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasContainerIdStart()) {
- return 0;
- }
- return p.getContainerIdStart();
- }
-
-
- @Override
- public void setNodesForScheduling(List nodesForScheduling) {
- maybeInitBuilder();
- if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
- if (this.nodesForScheduling != null) {
- this.nodesForScheduling.clear();
- }
- builder.clearNodesForScheduling();
- return;
- }
- this.nodesForScheduling = new ArrayList<>();
- this.nodesForScheduling.addAll(nodesForScheduling);
- }
-
- @Override
- public List getNodesForScheduling() {
- if (nodesForScheduling != null) {
- return nodesForScheduling;
- }
- initLocalNodesForSchedulingList();
- return nodesForScheduling;
- }
-
- private synchronized void initLocalNodesForSchedulingList() {
- YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
- List list = p.getNodesForSchedulingList();
- nodesForScheduling = new ArrayList<>();
- if (list != null) {
- for (YarnProtos.NodeIdProto t : list) {
- nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
- }
- }
- }
- private synchronized Iterable getNodeIdProtoIterable(
- final List nodeList) {
- maybeInitBuilder();
- return new Iterable() {
- @Override
- public synchronized Iterator iterator() {
- return new Iterator() {
-
- Iterator iter = nodeList.iterator();
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public YarnProtos.NodeIdProto next() {
- return ProtoUtils.convertToProtoFormat(iter.next());
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
- }
-
- @Override
- public String toString() {
- return TextFormat.shortDebugString(getProto());
- }
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulerAllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulerAllocateResponsePBImpl.java
new file mode 100644
index 0000000..6e0b810
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulerAllocateResponsePBImpl.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class DistributedSchedulerAllocateResponsePBImpl extends
+ DistributedSchedulerAllocateResponse {
+
+ YarnServerCommonServiceProtos.DistributedSchedulerAllocateResponseProto
+ proto = YarnServerCommonServiceProtos.
+ DistributedSchedulerAllocateResponseProto.getDefaultInstance();
+ YarnServerCommonServiceProtos.DistributedSchedulerAllocateResponseProto.
+ Builder builder = null;
+ boolean viaProto = false;
+
+ private AllocateResponse allocateResponse;
+ private List nodesForScheduling;
+
+ public DistributedSchedulerAllocateResponsePBImpl() {
+ builder = YarnServerCommonServiceProtos.
+ DistributedSchedulerAllocateResponseProto.newBuilder();
+ }
+
+ public DistributedSchedulerAllocateResponsePBImpl(
+ YarnServerCommonServiceProtos.
+ DistributedSchedulerAllocateResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public YarnServerCommonServiceProtos.DistributedSchedulerAllocateResponseProto
+ getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = YarnServerCommonServiceProtos.
+ DistributedSchedulerAllocateResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private synchronized void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private synchronized void mergeLocalToBuilder() {
+ if (this.nodesForScheduling != null) {
+ builder.clearNodesForScheduling();
+ Iterable iterable = getNodeIdProtoIterable(
+ this.nodesForScheduling);
+ builder.addAllNodesForScheduling(iterable);
+ }
+ if (this.allocateResponse != null) {
+ builder.setAllocateResponse(
+ ((AllocateResponsePBImpl) this.allocateResponse).getProto());
+ }
+ }
+
+ @Override
+ public void setAllocateResponse(AllocateResponse response) {
+ maybeInitBuilder();
+ if (allocateResponse == null) {
+ builder.clearAllocateResponse();
+ }
+ this.allocateResponse = response;
+ }
+
+ @Override
+ public AllocateResponse getAllocateResponse() {
+ if (this.allocateResponse != null) {
+ return this.allocateResponse;
+ }
+
+ YarnServerCommonServiceProtos.
+ DistributedSchedulerAllocateResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasAllocateResponse()) {
+ return null;
+ }
+
+ this.allocateResponse = new AllocateResponsePBImpl(p.getAllocateResponse());
+ return this.allocateResponse;
+ }
+
+ @Override
+ public void setNodesForScheduling(List nodesForScheduling) {
+ maybeInitBuilder();
+ if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
+ if (this.nodesForScheduling != null) {
+ this.nodesForScheduling.clear();
+ }
+ builder.clearNodesForScheduling();
+ return;
+ }
+ this.nodesForScheduling = new ArrayList<>();
+ this.nodesForScheduling.addAll(nodesForScheduling);
+ }
+
+ @Override
+ public List getNodesForScheduling() {
+ if (nodesForScheduling != null) {
+ return nodesForScheduling;
+ }
+ initLocalNodesForSchedulingList();
+ return nodesForScheduling;
+ }
+
+ private synchronized void initLocalNodesForSchedulingList() {
+ YarnServerCommonServiceProtos.
+ DistributedSchedulerAllocateResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ List list = p.getNodesForSchedulingList();
+ nodesForScheduling = new ArrayList<>();
+ if (list != null) {
+ for (YarnProtos.NodeIdProto t : list) {
+ nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
+ }
+ }
+ }
+
+ private synchronized Iterable getNodeIdProtoIterable(
+ final List nodeList) {
+ maybeInitBuilder();
+ return new Iterable() {
+ @Override
+ public synchronized Iterator iterator() {
+ return new Iterator() {
+
+ Iterator iter = nodeList.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public YarnProtos.NodeIdProto next() {
+ return ProtoUtils.convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulerRegisterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulerRegisterResponsePBImpl.java
new file mode 100644
index 0000000..5504650
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistributedSchedulerRegisterResponsePBImpl.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import com.google.protobuf.TextFormat;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class DistributedSchedulerRegisterResponsePBImpl extends
+ DistributedSchedulerRegisterResponse {
+
+ YarnServerCommonServiceProtos.DistributedSchedulerRegisterResponseProto
+ proto =
+ YarnServerCommonServiceProtos.DistributedSchedulerRegisterResponseProto
+ .getDefaultInstance();
+ YarnServerCommonServiceProtos.
+ DistributedSchedulerRegisterResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private Resource maxContainerResource;
+ private Resource minContainerRequest;
+ private Resource incrContainerRequest;
+ private List nodesForScheduling;
+ private RegisterApplicationMasterResponse registerApplicationMasterResponse;
+
+ public DistributedSchedulerRegisterResponsePBImpl() {
+ builder = YarnServerCommonServiceProtos.
+ DistributedSchedulerRegisterResponseProto.newBuilder();
+ }
+
+ public DistributedSchedulerRegisterResponsePBImpl(
+ YarnServerCommonServiceProtos.
+ DistributedSchedulerRegisterResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public YarnServerCommonServiceProtos.DistributedSchedulerRegisterResponseProto
+ getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = YarnServerCommonServiceProtos.
+ DistributedSchedulerRegisterResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private synchronized void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private synchronized void mergeLocalToBuilder() {
+ if (this.nodesForScheduling != null) {
+ builder.clearNodesForScheduling();
+ Iterable iterable = getNodeIdProtoIterable(
+ this.nodesForScheduling);
+ builder.addAllNodesForScheduling(iterable);
+ }
+ if (this.maxContainerResource != null) {
+ builder.setMaxContainerResource(ProtoUtils.convertToProtoFormat(
+ this.maxContainerResource));
+ }
+ if (this.minContainerRequest != null) {
+ builder.setMinContainerResource(ProtoUtils.convertToProtoFormat(
+ this.minContainerRequest));
+ }
+ if (this.registerApplicationMasterResponse != null) {
+ builder.setRegisterResponse(
+ ((RegisterApplicationMasterResponsePBImpl)
+ this.registerApplicationMasterResponse).getProto());
+ }
+ }
+
+ @Override
+ public void setRegisterResponse(RegisterApplicationMasterResponse resp) {
+ maybeInitBuilder();
+ if (registerApplicationMasterResponse == null) {
+ builder.clearRegisterResponse();
+ }
+ this.registerApplicationMasterResponse = resp;
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse getRegisterResponse() {
+ if (this.registerApplicationMasterResponse != null) {
+ return this.registerApplicationMasterResponse;
+ }
+
+ YarnServerCommonServiceProtos.
+ DistributedSchedulerRegisterResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasRegisterResponse()) {
+ return null;
+ }
+
+ this.registerApplicationMasterResponse =
+ new RegisterApplicationMasterResponsePBImpl(p.getRegisterResponse());
+ return this.registerApplicationMasterResponse;
+ }
+
+ @Override
+ public void setMaxContainerResource(Resource maxResource) {
+ maybeInitBuilder();
+ if (maxContainerResource == null) {
+ builder.clearMaxContainerResource();
+ }
+ this.maxContainerResource = maxResource;
+ }
+
+ @Override
+ public Resource getMaxContainerResource() {
+ if (this.maxContainerResource != null) {
+ return this.maxContainerResource;
+ }
+
+ YarnServerCommonServiceProtos.
+ DistributedSchedulerRegisterResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasMaxContainerResource()) {
+ return null;
+ }
+
+ this.maxContainerResource = ProtoUtils.convertFromProtoFormat(p
+ .getMaxContainerResource());
+ return this.maxContainerResource;
+ }
+
+ @Override
+ public void setMinContainerResource(Resource minResource) {
+ maybeInitBuilder();
+ if (minContainerRequest == null) {
+ builder.clearMinContainerResource();
+ }
+ this.minContainerRequest = minResource;
+ }
+
+ @Override
+ public Resource getMinContainerResource() {
+ if (this.minContainerRequest != null) {
+ return this.minContainerRequest;
+ }
+
+ YarnServerCommonServiceProtos.
+ DistributedSchedulerRegisterResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasMinContainerResource()) {
+ return null;
+ }
+
+ this.minContainerRequest = ProtoUtils.convertFromProtoFormat(p
+ .getMinContainerResource());
+ return this.minContainerRequest;
+ }
+
+ @Override
+ public void setIncrContainerResource(Resource incrResource) {
+ maybeInitBuilder();
+ if (incrContainerRequest == null) {
+ builder.clearIncrContainerResource();
+ }
+ this.incrContainerRequest = incrResource;
+ }
+
+ @Override
+ public Resource getIncrContainerResource() {
+ if (this.incrContainerRequest != null) {
+ return this.incrContainerRequest;
+ }
+
+ YarnServerCommonServiceProtos.
+ DistributedSchedulerRegisterResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasIncrContainerResource()) {
+ return null;
+ }
+
+ this.incrContainerRequest = ProtoUtils.convertFromProtoFormat(p
+ .getIncrContainerResource());
+ return this.incrContainerRequest;
+ }
+
+ @Override
+ public void setContainerTokenExpiryInterval(int interval) {
+ maybeInitBuilder();
+ builder.setContainerTokenExpiryInterval(interval);
+ }
+
+ @Override
+ public int getContainerTokenExpiryInterval() {
+ YarnServerCommonServiceProtos.
+ DistributedSchedulerRegisterResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasContainerTokenExpiryInterval()) {
+ return 0;
+ }
+ return p.getContainerTokenExpiryInterval();
+ }
+
+ @Override
+ public void setContainerIdStart(long containerIdStart) {
+ maybeInitBuilder();
+ builder.setContainerIdStart(containerIdStart);
+ }
+
+ @Override
+ public long getContainerIdStart() {
+ YarnServerCommonServiceProtos.
+ DistributedSchedulerRegisterResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasContainerIdStart()) {
+ return 0;
+ }
+ return p.getContainerIdStart();
+ }
+
+ @Override
+ public void setNodesForScheduling(List nodesForScheduling) {
+ maybeInitBuilder();
+ if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
+ if (this.nodesForScheduling != null) {
+ this.nodesForScheduling.clear();
+ }
+ builder.clearNodesForScheduling();
+ return;
+ }
+ this.nodesForScheduling = new ArrayList<>();
+ this.nodesForScheduling.addAll(nodesForScheduling);
+ }
+
+ @Override
+ public List getNodesForScheduling() {
+ if (nodesForScheduling != null) {
+ return nodesForScheduling;
+ }
+ initLocalNodesForSchedulingList();
+ return nodesForScheduling;
+ }
+
+ private synchronized void initLocalNodesForSchedulingList() {
+ YarnServerCommonServiceProtos.
+ DistributedSchedulerRegisterResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ List list = p.getNodesForSchedulingList();
+ nodesForScheduling = new ArrayList<>();
+ if (list != null) {
+ for (YarnProtos.NodeIdProto t : list) {
+ nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
+ }
+ }
+ }
+
+ private synchronized Iterable getNodeIdProtoIterable(
+ final List nodeList) {
+ maybeInitBuilder();
+ return new Iterable() {
+ @Override
+ public synchronized Iterator iterator() {
+ return new Iterator() {
+
+ Iterator iter = nodeList.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public YarnProtos.NodeIdProto next() {
+ return ProtoUtils.convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
index a7f0ece..fb567d5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
@@ -41,5 +41,5 @@ public static QueuedContainersStatus newInstance() {
public abstract int getWaitQueueLength();
- public abstract void setWaitQueueLength(int queueWaitTime);
+ public abstract void setWaitQueueLength(int waitQueueLength);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
index 7e3a77f..4622a53 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
@@ -33,6 +33,6 @@ import "yarn_server_common_service_protos.proto";
service DistributedSchedulerProtocolService {
- rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
- rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto);
+ rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistributedSchedulerRegisterResponseProto);
+ rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistributedSchedulerAllocateResponseProto);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index a7e5a86..c05f243 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -26,17 +26,17 @@ import "yarn_protos.proto";
import "yarn_server_common_protos.proto";
import "yarn_service_protos.proto";
-message DistSchedRegisterResponseProto {
+message DistributedSchedulerRegisterResponseProto {
optional RegisterApplicationMasterResponseProto register_response = 1;
- optional ResourceProto max_alloc_capability = 2;
- optional ResourceProto min_alloc_capability = 3;
- optional ResourceProto incr_alloc_capability = 4;
+ optional ResourceProto max_container_resource = 2;
+ optional ResourceProto min_container_resource = 3;
+ optional ResourceProto incr_container_resource = 4;
optional int32 container_token_expiry_interval = 5;
optional int64 container_id_start = 6;
repeated NodeIdProto nodes_for_scheduling = 7;
}
-message DistSchedAllocateResponseProto {
+message DistributedSchedulerAllocateResponseProto {
optional AllocateResponseProto allocate_response = 1;
repeated NodeIdProto nodes_for_scheduling = 2;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index ac360f4..e1abe81 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -467,8 +467,7 @@ protected RequestInterceptor createRequestInterceptorChain() {
interceptorClassNames.add(item.trim());
}
- // Make sure LocalScheduler is present at the beginning
- // of the chain..
+ // Make sure LocalScheduler is present at the beginning of the chain.
if (this.nmContext.isDistributedSchedulingEnabled()) {
interceptorClassNames.add(0, LocalScheduler.class.getName());
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
index 2b2a2f6..ce7fb5b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
@@ -25,8 +25,8 @@
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse;
import java.io.IOException;
@@ -118,7 +118,7 @@ public AMRMProxyApplicationContext getApplicationContext() {
* @throws IOException
*/
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
+ public DistributedSchedulerAllocateResponse allocateForDistributedScheduling
(AllocateRequest request) throws YarnException, IOException {
return (this.nextInterceptor != null) ?
this.nextInterceptor.allocateForDistributedScheduling(request) : null;
@@ -134,7 +134,7 @@ public AMRMProxyApplicationContext getApplicationContext() {
* @throws IOException
*/
@Override
- public DistSchedRegisterResponse
+ public DistributedSchedulerRegisterResponse
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 1637682..494da5d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -45,9 +45,8 @@
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
-import org.apache.hadoop.yarn.server.api.protocolrecords
- .DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,7 +122,7 @@ public AllocateResponse allocate(final AllocateRequest request)
}
@Override
- public DistSchedRegisterResponse
+ public DistributedSchedulerRegisterResponse
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
@@ -133,13 +132,13 @@ public AllocateResponse allocate(final AllocateRequest request)
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
+ public DistributedSchedulerAllocateResponse allocateForDistributedScheduling
(AllocateRequest request) throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocateForDistributedScheduling request" +
"to the real YARN RM");
}
- DistSchedAllocateResponse allocateResponse =
+ DistributedSchedulerAllocateResponse allocateResponse =
rmClient.allocateForDistributedScheduling(request);
if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
@@ -204,7 +203,7 @@ public AllocateResponse allocate(AllocateRequest request) throws
}
@Override
- public DistSchedRegisterResponse
+ public DistributedSchedulerRegisterResponse
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
@@ -212,7 +211,7 @@ public AllocateResponse allocate(AllocateRequest request) throws
}
@Override
- public DistSchedAllocateResponse
+ public DistributedSchedulerAllocateResponse
allocateForDistributedScheduling(AllocateRequest request) throws
YarnException, IOException {
throw new IOException("Not Supported !!");
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
index 4f9d5a3..707051f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
@@ -200,6 +200,7 @@ private void startAllocatedContainer(
.getContainerTokenIdentifier().getContainerID();
this.context.getQueuingContext().getQueuedContainers().remove(containerId);
try {
+ LOG.info("Starting container [" + containerId + "]");
super.startContainerInternal(
allocatedContainerInfo.getContainerTokenIdentifier(),
allocatedContainerInfo.getStartRequest());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
index 42c1dcd..4fee58c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
@@ -21,8 +21,8 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords
@@ -64,16 +64,16 @@
import java.util.TreeMap;
/**
- * The LocalScheduler runs on the NodeManager and is modelled as an
+ *
The LocalScheduler runs on the NodeManager and is modeled as an
* AMRMProxy request interceptor. It is responsible for the
- * following :
+ * following:
*
* - Intercept
ApplicationMasterProtocol calls and unwrap the
* response objects to extract instructions from the
- * ClusterManager running on the ResourceManager to aid in making
- * Scheduling scheduling decisions
+ * ClusterMonitor running on the ResourceManager to aid in making
+ * distributed scheduling decisions.
* - Call the
OpportunisticContainerAllocator to allocate
- * containers for the opportunistic resource outstandingOpReqs
+ * containers for the outstanding OPPORTUNISTIC container requests.
*
*/
public final class LocalScheduler extends AbstractRequestInterceptor {
@@ -89,7 +89,7 @@
}
}
- static class DistSchedulerParams {
+ static class DistributedSchedulerParams {
Resource maxResource;
Resource minResource;
Resource incrementResource;
@@ -99,13 +99,15 @@
private static final Logger LOG = LoggerFactory
.getLogger(LocalScheduler.class);
- // Currently just used to keep track of allocated Containers
- // Can be used for reporting stats later
+ // Currently just used to keep track of allocated containers.
+ // Can be used for reporting stats later.
private Set containersAllocated = new HashSet<>();
- private DistSchedulerParams appParams = new DistSchedulerParams();
- private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter =
- new OpportunisticContainerAllocator.ContainerIdCounter();
+ private DistributedSchedulerParams appParams =
+ new DistributedSchedulerParams();
+ private final OpportunisticContainerAllocator.ContainerIdCounter
+ containerIdCounter =
+ new OpportunisticContainerAllocator.ContainerIdCounter();
private Map nodeList = new HashMap<>();
// Mapping of NodeId to NodeTokens. Populated either from RM response or
@@ -116,7 +118,7 @@
// This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
// Resource Name (Host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC
- // ResourceRequests (ask)
+ // ResourceRequest (ask).
final TreeMap>
outstandingOpReqs = new TreeMap<>();
@@ -189,9 +191,6 @@ public AllocateResponse allocate(AllocateRequest request) throws
/**
* Check if we already have a NMToken. if Not, generate the Token and
* add it to the response
- * @param response
- * @param nmTokens
- * @param allocatedContainers
*/
private void updateResponseWithNMTokens(AllocateResponse response,
List nmTokens, List allocatedContainers) {
@@ -224,11 +223,11 @@ private PartitionedResourceRequests partitionAskList(List
}
private void updateParameters(
- DistSchedRegisterResponse registerResponse) {
- appParams.minResource = registerResponse.getMinAllocatableCapabilty();
- appParams.maxResource = registerResponse.getMaxAllocatableCapabilty();
+ DistributedSchedulerRegisterResponse registerResponse) {
+ appParams.minResource = registerResponse.getMinContainerResource();
+ appParams.maxResource = registerResponse.getMaxContainerResource();
appParams.incrementResource =
- registerResponse.getIncrAllocatableCapabilty();
+ registerResponse.getIncrContainerResource();
if (appParams.incrementResource == null) {
appParams.incrementResource = appParams.minResource;
}
@@ -242,11 +241,10 @@ private void updateParameters(
/**
* Takes a list of ResourceRequests (asks), extracts the key information viz.
- * (Priority, ResourceName, Capability) and adds it the outstanding
+ * (Priority, ResourceName, Capability) and adds to the outstanding
* OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
* the current YARN constraint that only a single ResourceRequest can exist at
- * a give Priority and Capability
- * @param resourceAsks
+ * a give Priority and Capability.
*/
public void addToOutstandingReqs(List resourceAsks) {
for (ResourceRequest request : resourceAsks) {
@@ -286,9 +284,7 @@ public void addToOutstandingReqs(List resourceAsks) {
/**
* This method matches a returned list of Container Allocations to any
- * outstanding OPPORTUNISTIC ResourceRequest
- * @param capability
- * @param allocatedContainers
+ * outstanding OPPORTUNISTIC ResourceRequest.
*/
public void matchAllocationToOutstandingRequest(Resource capability,
List allocatedContainers) {
@@ -322,23 +318,25 @@ private void addToNodeList(List nodes) {
}
@Override
- public DistSchedRegisterResponse
- registerApplicationMasterForDistributedScheduling
- (RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
+ public DistributedSchedulerRegisterResponse
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
LOG.info("Forwarding registration request to the" +
- "Distributed Scheduler Service on YARN RM");
- DistSchedRegisterResponse dsResp = getNextInterceptor()
+ "Distributed Scheduling Service on YARN RM");
+ DistributedSchedulerRegisterResponse dsResp = getNextInterceptor()
.registerApplicationMasterForDistributedScheduling(request);
updateParameters(dsResp);
return dsResp;
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
+ public DistributedSchedulerAllocateResponse allocateForDistributedScheduling
(AllocateRequest request) throws YarnException, IOException {
- LOG.info("Forwarding allocate request to the" +
- "Distributed Scheduler Service on YARN RM");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Forwarding allocate request to the" +
+ "Distributed Scheduler Service on YARN RM");
+ }
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
PartitionedResourceRequests partitionedAsks = partitionAskList(request
.getAskList());
@@ -381,7 +379,7 @@ private void addToNodeList(List nodes) {
// Send all the GUARANTEED Reqs to RM
request.setAskList(partitionedAsks.getGuaranteed());
- DistSchedAllocateResponse dsResp =
+ DistributedSchedulerAllocateResponse dsResp =
getNextInterceptor().allocateForDistributedScheduling(request);
// Update host to nodeId mapping
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
index 03ba61d..2982cbc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
@@ -29,7 +29,7 @@
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistSchedulerParams;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistributedSchedulerParams;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -40,12 +40,14 @@
import java.util.concurrent.atomic.AtomicLong;
/**
- * The OpportunisticContainerAllocator allocates containers on a given list
- * of Nodes after it modifies the container sizes to within allowable limits
- * specified by the ClusterManager running on the RM. It tries to
- * distribute the containers as evenly as possible. It also uses the
+ *
+ * The OpportunisticContainerAllocator allocates containers on a given list of
+ * nodes, after modifying the container sizes to respect the limits set by the
+ * ClusterManager running on the RM. It tries to distribute the
+ * containers as evenly as possible. It also uses the
* NMTokenSecretManagerInNM to generate the required NM tokens for
- * the allocated containers
+ * the allocated containers.
+ *
*/
public class OpportunisticContainerAllocator {
@@ -78,32 +80,34 @@ public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater,
this.webpagePort = webpagePort;
}
- public Map> allocate(DistSchedulerParams appParams,
- ContainerIdCounter idCounter, Collection resourceAsks,
- Set blacklist, ApplicationAttemptId appAttId,
- Map allNodes, String userName) throws YarnException {
+ public Map> allocate(
+ DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
+ Collection resourceAsks, Set blacklist,
+ ApplicationAttemptId appAttId, Map allNodes,
+ String userName) throws YarnException {
Map> containers = new HashMap<>();
Set nodesAllocated = new HashSet<>();
- int numAsks = resourceAsks.size();
for (ResourceRequest anyAsk : resourceAsks) {
allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
allNodes, userName, containers, nodesAllocated, anyAsk);
- }
- if (numAsks > 0) {
- LOG.info("Opportunistic allocation requested for: " + numAsks
- + " containers; allocated = " + containers.size());
+ LOG.info("Opportunistic allocation requested for ["
+ + "priority=" + anyAsk.getPriority()
+ + ", num_containers=" + anyAsk.getNumContainers()
+ + ", capability=" + anyAsk.getCapability() + "]"
+ + " allocated = " + containers.get(anyAsk.getCapability()).size());
}
return containers;
}
- private void allocateOpportunisticContainers(DistSchedulerParams appParams,
- ContainerIdCounter idCounter, Set blacklist,
- ApplicationAttemptId id, Map allNodes, String userName,
+ private void allocateOpportunisticContainers(
+ DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
+ Set blacklist, ApplicationAttemptId id,
+ Map allNodes, String userName,
Map> containers, Set nodesAllocated,
ResourceRequest anyAsk) throws YarnException {
int toAllocate = anyAsk.getNumContainers()
- - (containers.isEmpty() ?
- 0 : containers.get(anyAsk.getCapability()).size());
+ - (containers.isEmpty() ? 0 :
+ containers.get(anyAsk.getCapability()).size());
List topKNodesLeft = new ArrayList<>();
for (String s : allNodes.keySet()) {
@@ -129,11 +133,12 @@ private void allocateOpportunisticContainers(DistSchedulerParams appParams,
}
cList.add(container);
numAllocated++;
- LOG.info("Allocated " + numAllocated + " opportunistic containers.");
+ LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
}
+ LOG.info("Allocated " + numAllocated + " opportunistic containers.");
}
- private Container buildContainer(DistSchedulerParams appParams,
+ private Container buildContainer(DistributedSchedulerParams appParams,
ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
String userName, NodeId nodeId) throws YarnException {
ContainerId cId =
@@ -145,12 +150,13 @@ private Container buildContainer(DistSchedulerParams appParams,
long currTime = System.currentTimeMillis();
ContainerTokenIdentifier containerTokenIdentifier =
- new ContainerTokenIdentifier(
- cId, nodeId.getHost(), userName, capability,
- currTime + appParams.containerTokenExpiryInterval,
- context.getContainerTokenSecretManager().getCurrentKey().getKeyId(),
- nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime,
- null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
+ new ContainerTokenIdentifier(cId, nodeId.getHost() + ":" + nodeId
+ .getPort(), userName, capability, currTime
+ + appParams.containerTokenExpiryInterval, context
+ .getContainerTokenSecretManager().getCurrentKey()
+ .getKeyId(), nodeStatusUpdater.getRMIdentifier(), rr
+ .getPriority(), currTime, null,
+ CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
ExecutionType.OPPORTUNISTIC);
byte[] pwd =
context.getContainerTokenSecretManager().createPassword(
@@ -163,7 +169,7 @@ private Container buildContainer(DistSchedulerParams appParams,
return container;
}
- private Resource normalizeCapability(DistSchedulerParams appParams,
+ private Resource normalizeCapability(DistributedSchedulerParams appParams,
ResourceRequest ask) {
return Resources.normalize(RESOURCE_CALCULATOR,
ask.getCapability(), appParams.minResource, appParams.maxResource,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
index efc682a..33034c1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
@@ -19,11 +19,12 @@
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords
@@ -45,6 +46,7 @@
.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security
.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@@ -102,15 +104,15 @@ public void setBytes(ByteBuffer bytes) {}
RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class);
localScheduler.setNextInterceptor(finalReqIntcptr);
- DistSchedRegisterResponse distSchedRegisterResponse =
- Records.newRecord(DistSchedRegisterResponse.class);
+ DistributedSchedulerRegisterResponse distSchedRegisterResponse =
+ Records.newRecord(DistributedSchedulerRegisterResponse.class);
distSchedRegisterResponse.setRegisterResponse(
Records.newRecord(RegisterApplicationMasterResponse.class));
distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
distSchedRegisterResponse.setContainerIdStart(0);
- distSchedRegisterResponse.setMaxAllocatableCapabilty(
+ distSchedRegisterResponse.setMaxContainerResource(
Resource.newInstance(1024, 4));
- distSchedRegisterResponse.setMinAllocatableCapabilty(
+ distSchedRegisterResponse.setMinContainerResource(
Resource.newInstance(512, 2));
distSchedRegisterResponse.setNodesForScheduling(Arrays.asList(
NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
@@ -125,9 +127,9 @@ public void setBytes(ByteBuffer bytes) {}
Mockito.when(
finalReqIntcptr.allocateForDistributedScheduling(
Mockito.any(AllocateRequest.class)))
- .thenAnswer(new Answer() {
+ .thenAnswer(new Answer() {
@Override
- public DistSchedAllocateResponse answer(InvocationOnMock
+ public DistributedSchedulerAllocateResponse answer(InvocationOnMock
invocationOnMock) throws Throwable {
return createAllocateResponse(Arrays.asList(
NodeId.newInstance("c", 3), NodeId.newInstance("d", 4)));
@@ -186,9 +188,9 @@ public DistSchedAllocateResponse answer(InvocationOnMock
Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
}
- private DistSchedAllocateResponse createAllocateResponse(List nodes) {
- DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord
- (DistSchedAllocateResponse.class);
+ private DistributedSchedulerAllocateResponse createAllocateResponse(List nodes) {
+ DistributedSchedulerAllocateResponse distSchedAllocateResponse = Records.newRecord
+ (DistributedSchedulerAllocateResponse.class);
distSchedAllocateResponse.setAllocateResponse(
Records.newRecord(AllocateResponse.class));
distSchedAllocateResponse.setNodesForScheduling(nodes);
@@ -196,9 +198,14 @@ private DistSchedAllocateResponse createAllocateResponse(List nodes) {
}
private Map> mapAllocs(AllocateResponse
- allocateResponse) {
+ allocateResponse) throws Exception {
Map> allocs = new HashMap<>();
for (Container c : allocateResponse.getAllocatedContainers()) {
+ ContainerTokenIdentifier cTokId = BuilderUtils
+ .newContainerTokenIdentifier(c.getContainerToken());
+ Assert.assertEquals(
+ c.getNodeId().getHost() + ":" + c.getNodeId().getPort(),
+ cTokId.getNmHostAddress());
List cIds = allocs.get(c.getNodeId());
if (cIds == null) {
cIds = new ArrayList<>();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 4f90fa0..ac98c57 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -91,8 +91,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.security
- .AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
index a93f683..4d6301d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
@@ -32,8 +32,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -77,20 +77,20 @@
private static final Log LOG =
LogFactory.getLog(DistributedSchedulingService.class);
- private final NodeQueueLoadMonitor nodeMonitor;
+ private final NodeQueueLoadMonitor nodeQueueLoadMonitor;
private final ConcurrentHashMap> rackToNode =
new ConcurrentHashMap<>();
private final ConcurrentHashMap> hostToNode =
new ConcurrentHashMap<>();
- private final int k;
+ private final int nodesNo;
public DistributedSchedulingService(RMContext rmContext,
YarnScheduler scheduler) {
super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
- this.k = rmContext.getYarnConfiguration().getInt(
- YarnConfiguration.DIST_SCHEDULING_TOP_K,
- YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
+ this.nodesNo = rmContext.getYarnConfiguration().getInt(
+ YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED,
+ YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT);
long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
YarnConfiguration.
@@ -134,7 +134,7 @@ public DistributedSchedulingService(RMContext rmContext,
}
topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
- this.nodeMonitor = topKSelector;
+ this.nodeQueueLoadMonitor = topKSelector;
}
@Override
@@ -144,8 +144,8 @@ public Server getServer(YarnRPC rpc, Configuration serverConf,
addr, serverConf, secretManager,
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
- // To support application running on NMs that DO NOT support
- // Dist Scheduling... The server multiplexes both the
+ // To support applications running on NMs that DO NOT support
+ // Distributed Scheduling... The server multiplexes both the
// ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
ApplicationMasterProtocolPB.class,
@@ -175,43 +175,45 @@ public AllocateResponse allocate(AllocateRequest request) throws
}
@Override
- public DistSchedRegisterResponse
+ public DistributedSchedulerRegisterResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
RegisterApplicationMasterResponse response =
registerApplicationMaster(request);
- DistSchedRegisterResponse dsResp = recordFactory
- .newRecordInstance(DistSchedRegisterResponse.class);
+ DistributedSchedulerRegisterResponse dsResp = recordFactory
+ .newRecordInstance(DistributedSchedulerRegisterResponse.class);
dsResp.setRegisterResponse(response);
- dsResp.setMinAllocatableCapabilty(
+ dsResp.setMinContainerResource(
Resource.newInstance(
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
- YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
+ YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB,
+ YarnConfiguration.
+ DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT),
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
- YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
+ YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT)
)
);
- dsResp.setMaxAllocatableCapabilty(
+ dsResp.setMaxContainerResource(
Resource.newInstance(
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
- YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
+ YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB,
+ YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT),
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
- YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
+ YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT)
)
);
- dsResp.setIncrAllocatableCapabilty(
+ dsResp.setIncrContainerResource(
Resource.newInstance(
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
- YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
+ YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB,
+ YarnConfiguration.
+ DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT),
getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
- YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
+ YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT)
)
);
dsResp.setContainerTokenExpiryInterval(
@@ -224,19 +226,19 @@ public AllocateResponse allocate(AllocateRequest request) throws
// Set nodes to be used for scheduling
dsResp.setNodesForScheduling(
- this.nodeMonitor.selectLeastLoadedNodes(this.k));
+ this.nodeQueueLoadMonitor.selectLeastLoadedNodes(this.nodesNo));
return dsResp;
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
+ public DistributedSchedulerAllocateResponse allocateForDistributedScheduling
(AllocateRequest request) throws YarnException, IOException {
AllocateResponse response = allocate(request);
- DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
- (DistSchedAllocateResponse.class);
+ DistributedSchedulerAllocateResponse dsResp = recordFactory.newRecordInstance
+ (DistributedSchedulerAllocateResponse.class);
dsResp.setAllocateResponse(response);
dsResp.setNodesForScheduling(
- this.nodeMonitor.selectLeastLoadedNodes(this.k));
+ this.nodeQueueLoadMonitor.selectLeastLoadedNodes(this.nodesNo));
return dsResp;
}
@@ -269,7 +271,7 @@ public void handle(SchedulerEvent event) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
- nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
+ nodeQueueLoadMonitor.addNode(nodeAddedEvent.getContainerReports(),
nodeAddedEvent.getAddedRMNode());
addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
nodeAddedEvent.getAddedRMNode().getNodeID());
@@ -282,7 +284,7 @@ public void handle(SchedulerEvent event) {
}
NodeRemovedSchedulerEvent nodeRemovedEvent =
(NodeRemovedSchedulerEvent) event;
- nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
+ nodeQueueLoadMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
removeFromMapping(rackToNode,
nodeRemovedEvent.getRemovedRMNode().getRackName(),
nodeRemovedEvent.getRemovedRMNode().getNodeID());
@@ -296,7 +298,7 @@ public void handle(SchedulerEvent event) {
}
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)
event;
- nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode());
+ nodeQueueLoadMonitor.updateNode(nodeUpdatedEvent.getRMNode());
break;
case NODE_RESOURCE_UPDATE:
if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
@@ -304,7 +306,8 @@ public void handle(SchedulerEvent event) {
}
NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
(NodeResourceUpdateSchedulerEvent) event;
- nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
+ nodeQueueLoadMonitor.updateNodeResource(
+ nodeResourceUpdatedEvent.getRMNode(),
nodeResourceUpdatedEvent.getResourceOption());
break;
@@ -330,6 +333,6 @@ public void handle(SchedulerEvent event) {
}
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
- return nodeMonitor.getThresholdCalculator();
+ return nodeQueueLoadMonitor.getThresholdCalculator();
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index f9d3325..423e296 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -1147,7 +1147,7 @@ protected ApplicationMasterService createApplicationMasterService() {
EventDispatcher distSchedulerEventDispatcher =
new EventDispatcher(distributedSchedulingService,
DistributedSchedulingService.class.getName());
- // Add an event dispoatcher for the DistributedSchedulingService
+ // Add an event dispatcher for the DistributedSchedulingService
// to handle node updates/additions and removals.
// Since the SchedulerEvent is currently a super set of theses,
// we register interest for it..
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 463bebd..5952cc2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -93,8 +93,7 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
this.queue = queue;
this.user = user;
this.activeUsersManager = activeUsersManager;
- this.containerIdCounter =
- new AtomicLong(epoch << EPOCH_BIT_SHIFT);
+ this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
this.appResourceUsage = appResourceUsage;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
index 21f4f6e..017a256 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
@@ -195,11 +195,11 @@ public void updateNode(RMNode rmNode) {
new ClusterNode(rmNode.getNodeID())
.setQueueWaitTime(estimatedQueueWaitTime)
.setQueueLength(waitQueueLength));
- LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
+ LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " +
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]");
} else {
- LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" +
+ LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "] " +
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]");
}
@@ -210,12 +210,14 @@ public void updateNode(RMNode rmNode) {
.setQueueWaitTime(estimatedQueueWaitTime)
.setQueueLength(waitQueueLength)
.updateTimestamp();
- LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
- "with queue wait time [" + estimatedQueueWaitTime + "] and " +
- "wait queue length [" + waitQueueLength + "]");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updating ClusterNode [" + rmNode.getNodeID() + "] " +
+ "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+ "wait queue length [" + waitQueueLength + "]");
+ }
} else {
this.clusterNodes.remove(rmNode.getNodeID());
- LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
+ LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "] " +
"with queue wait time [" + currentNode.queueWaitTime + "] and " +
"wait queue length [" + currentNode.queueLength + "]");
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
index 49bf4d0..418d7ad 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
@@ -25,21 +25,16 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .AllocateResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .FinishApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .RegisterApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
- .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulerRegisterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@@ -52,12 +47,9 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
- .DistSchedAllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
- .DistSchedRegisterResponsePBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
- .AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulerAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulerRegisterResponsePBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.junit.Assert;
import org.junit.Test;
@@ -125,21 +117,21 @@ public AllocateResponse allocate(AllocateRequest request) throws
}
@Override
- public DistSchedRegisterResponse
+ public DistributedSchedulerRegisterResponse
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws
YarnException, IOException {
- DistSchedRegisterResponse resp = factory.newRecordInstance(
- DistSchedRegisterResponse.class);
+ DistributedSchedulerRegisterResponse resp = factory.newRecordInstance(
+ DistributedSchedulerRegisterResponse.class);
resp.setContainerIdStart(54321l);
return resp;
}
@Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
+ public DistributedSchedulerAllocateResponse allocateForDistributedScheduling
(AllocateRequest request) throws YarnException, IOException {
- DistSchedAllocateResponse resp =
- factory.newRecordInstance(DistSchedAllocateResponse.class);
+ DistributedSchedulerAllocateResponse resp =
+ factory.newRecordInstance(DistributedSchedulerAllocateResponse.class);
resp.setNodesForScheduling(
Arrays.asList(NodeId.newInstance("h1", 1234)));
return resp;
@@ -187,15 +179,15 @@ public AllocateResponse allocate(AllocateRequest request) throws
RPC.getProxy(DistributedSchedulerProtocolPB
.class, 1, NetUtils.getConnectAddress(server), conf);
- DistSchedRegisterResponse dsRegResp =
- new DistSchedRegisterResponsePBImpl(
+ DistributedSchedulerRegisterResponse dsRegResp =
+ new DistributedSchedulerRegisterResponsePBImpl(
dsProxy.registerApplicationMasterForDistributedScheduling(null,
((RegisterApplicationMasterRequestPBImpl)factory
.newRecordInstance(RegisterApplicationMasterRequest.class))
.getProto()));
Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
- DistSchedAllocateResponse dsAllocResp =
- new DistSchedAllocateResponsePBImpl(
+ DistributedSchedulerAllocateResponse dsAllocResp =
+ new DistributedSchedulerAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
((AllocateRequestPBImpl)factory
.newRecordInstance(AllocateRequest.class)).getProto()));
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 2372ea2..de4e22d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -75,6 +75,9 @@
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
@@ -713,8 +716,14 @@ protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
- return new CustomContainerManagerImpl(context, exec, del,
- nodeStatusUpdater, metrics, dirsHandler);
+ if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED,
+ YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) {
+ return new CustomQueueingContainerManagerImpl(context, exec, del,
+ nodeStatusUpdater, metrics, dirsHandler);
+ } else {
+ return new CustomContainerManagerImpl(context, exec, del,
+ nodeStatusUpdater, metrics, dirsHandler);
+ }
}
}
@@ -846,6 +855,55 @@ protected void createAMRMProxyService(Configuration conf) {
}
}
+ private class CustomQueueingContainerManagerImpl extends
+ QueuingContainerManagerImpl {
+
+ public CustomQueueingContainerManagerImpl(Context context,
+ ContainerExecutor exec, DeletionService del, NodeStatusUpdater
+ nodeStatusUpdater, NodeManagerMetrics metrics,
+ LocalDirsHandlerService dirsHandler) {
+ super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler);
+ }
+
+ @Override
+ protected ContainersMonitor createContainersMonitor(ContainerExecutor
+ exec) {
+ return new ContainersMonitorImpl(exec, dispatcher, this.context) {
+
+ @Override
+ public void increaseContainersAllocation(ProcessTreeInfo pti) { }
+
+ @Override
+ public void decreaseContainersAllocation(ProcessTreeInfo pti) { }
+
+ @Override
+ public boolean hasResourcesAvailable(
+ ContainersMonitorImpl.ProcessTreeInfo pti) {
+ return true;
+ }
+ };
+ }
+
+ @Override
+ protected void createAMRMProxyService(Configuration conf) {
+ this.amrmProxyEnabled =
+ conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+
+ if (this.amrmProxyEnabled) {
+ LOG.info("CustomAMRMProxyService is enabled. "
+ + "All the AM->RM requests will be intercepted by the proxy");
+ AMRMProxyService amrmProxyService =
+ useRpc ? new AMRMProxyService(getContext(), dispatcher)
+ : new ShortCircuitedAMRMProxy(getContext(), dispatcher);
+ this.setAMRMProxyService(amrmProxyService);
+ addService(this.getAMRMProxyService());
+ } else {
+ LOG.info("CustomAMRMProxyService is disabled");
+ }
+ }
+ }
+
private class ShortCircuitedAMRMProxy extends AMRMProxyService {
public ShortCircuitedAMRMProxy(Context context,