diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 92d586bfa3708bc2d68d51c416cf7b5a0c6a61d7..6ab74ab8f4e5de671c65bc96bf8622c7688e084f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -191,6 +192,11 @@ public void updateNodeHeartbeatResponseForContainersDecreasing( } @Override + public ContainerQueueInfo getContainerQueueInfo() { + return null; + } + + @Override public ResourceUtilization getAggregatedContainersUtilization() { return null; } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 2e9cccb27781a63c315e015bb371c9f7321bff8d..8f97e78b87674642fb0cdfd1530a0d0b78bbd836 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -180,6 +181,11 @@ public void updateNodeHeartbeatResponseForContainersDecreasing( } @Override + public ContainerQueueInfo getContainerQueueInfo() { + return null; + } + + @Override public ResourceUtilization getAggregatedContainersUtilization() { return node.getAggregatedContainersUtilization(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ac8aeeae47456f7d69ffbd5a2c484131f46b32b8..b39852635fb4a3c4dd925f9e074aa90ee6d61a61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -312,6 +312,18 @@ private static void addDeprecatedKeys() { YARN_PREFIX + "distributed-scheduling.max-vcores"; public static final int DIST_SCHEDULING_MAX_VCORES_DEFAULT = 4; + public static final String DIST_SCHEDULING_TOP_K = + YARN_PREFIX + "distributed-scheduling.top-k"; + public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10; + + public static final String DIST_SCHEDULING_NUM_STRAGGLERS = + YARN_PREFIX + "distributed-scheduling.num-stragglers"; + public static final int DIST_SCHEDULING_NUM_STRAGGLERS_DEFAULT = 2; + + public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS = + YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms"; + public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000; + /** Container token expiry for container allocated via * Distributed Scheduling */ public static String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java new file mode 100644 index 0000000000000000000000000000000000000000..7265e1293709f7dc204d789989391b4a9c3ddf8d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +public abstract class EventDispatcher extends AbstractService implements + EventHandler { + + private final EventHandler handler; + private final BlockingQueue eventQueue = + new LinkedBlockingDeque<>(); + private final Thread eventProcessor; + private volatile boolean stopped = false; + private boolean shouldExitOnError = false; + + private static final Log LOG = LogFactory.getLog(EventDispatcher.class); + + private final class EventProcessor implements Runnable { + @Override + public void run() { + + T event; + + while (!stopped && !Thread.currentThread().isInterrupted()) { + try { + event = eventQueue.take(); + } catch (InterruptedException e) { + LOG.error("Returning, interrupted : " + e); + return; // TODO: Kill RM. + } + + try { + handler.handle(event); + } catch (Throwable t) { + // An error occurred, but we are shutting down anyway. + // If it was an InterruptedException, the very act of + // shutdown could have caused it and is probably harmless. + if (stopped) { + LOG.warn("Exception during shutdown: ", t); + break; + } + LOG.fatal("Error in handling event type " + event.getType() + + " to the scheduler", t); + if (shouldExitOnError + && !ShutdownHookManager.get().isShutdownInProgress()) { + LOG.info("Exiting, bbye.."); + System.exit(-1); + } + } + } + } + } + + public EventDispatcher(EventHandler handler, String name) { + super(name); + this.handler = handler; + this.eventProcessor = new Thread(new EventProcessor()); + this.eventProcessor.setName(getName() + ":Event Processor"); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.shouldExitOnError = + conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, + Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + this.eventProcessor.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + this.stopped = true; + this.eventProcessor.interrupt(); + try { + this.eventProcessor.join(); + } catch (InterruptedException e) { + throw new YarnRuntimeException(e); + } + super.serviceStop(); + } + + @Override + public void handle(T event) { + try { + int qSize = eventQueue.size(); + if (qSize !=0 && qSize %1000 == 0) { + LOG.info("Size of " + getName() + " event-queue is " + qSize); + } + int remCapacity = eventQueue.remainingCapacity(); + if (remCapacity < 1000) { + LOG.info("Very low remaining capacity on " + getName() + "" + + "event queue: " + remCapacity); + } + this.eventQueue.put(event); + } catch (InterruptedException e) { + LOG.info("Interrupted. Trying to exit gracefully."); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueueInfo.java new file mode 100644 index 0000000000000000000000000000000000000000..3a3b5cf8d647904cd90f5e5ea18bc02fbdebce90 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueueInfo.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * ContainerQueueInfo captures information pertaining to the state + * of execution of the Queueable containers within a node. + *

+ */ +@Private +@Evolving +public abstract class ContainerQueueInfo { + public static ContainerQueueInfo newInstance() { + return Records.newRecord(ContainerQueueInfo.class); + } + + public abstract int getEstimatedQueueWaitTime(); + + public abstract void setEstimatedQueueWaitTime(int queueWaitTime); + + public abstract int getWaitQueueLength(); + + public abstract void setWaitQueueLength(int waitQueueLength); + + public abstract int getContainersKilled(); + + public abstract void setContainersKilled(int containersKilled); + + public abstract int getContainersDequeued(); + + public abstract void setContainersDequeued(int containersDequeued); + + public abstract ResourceUtilization getContainersUtilization(); + + public abstract void setContainersUtilization( + ResourceUtilization runningContainersUtilization); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 836cd4b79faa8de79bad15d2bcd5282193986563..2688522106d563761480959dfb3e4200283a1b55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -122,4 +122,13 @@ public abstract void setNodeUtilization( @Unstable public abstract void setIncreasedContainers( List increasedContainers); + + @Private + @Unstable + public abstract ContainerQueueInfo getContainerQueueInfo(); + + @Private + @Unstable + public abstract void setContainerQueueInfo( + ContainerQueueInfo containerQueueInfo); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueueInfoPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueueInfoPBImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..923de8ea49fbac9a315597f49a9f6b84c293dc5a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueueInfoPBImpl.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; +import org.apache.hadoop.yarn.server.api.records.ContainerQueueInfo; + +public class ContainerQueueInfoPBImpl extends ContainerQueueInfo { + + private YarnServerCommonProtos.ContainerQueueInfoProto proto = + YarnServerCommonProtos.ContainerQueueInfoProto.getDefaultInstance(); + private YarnServerCommonProtos.ContainerQueueInfoProto.Builder builder = null; + private boolean viaProto = false; + + public ContainerQueueInfoPBImpl() { + builder = YarnServerCommonProtos.ContainerQueueInfoProto.newBuilder(); + } + + public ContainerQueueInfoPBImpl(YarnServerCommonProtos + .ContainerQueueInfoProto proto) { + this.proto = proto; + viaProto = true; + } + + public YarnServerCommonProtos.ContainerQueueInfoProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = + YarnServerCommonProtos.ContainerQueueInfoProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getEstimatedQueueWaitTime() { + YarnServerCommonProtos.ContainerQueueInfoProtoOrBuilder p = + viaProto ? proto : builder; + return p.getEstimatedQueueWaitTime(); + } + + @Override + public void setEstimatedQueueWaitTime(int queueWaitTime) { + maybeInitBuilder(); + builder.setEstimatedQueueWaitTime(queueWaitTime); + } + + @Override + public int getWaitQueueLength() { + YarnServerCommonProtos.ContainerQueueInfoProtoOrBuilder p = + viaProto ? proto : builder; + return p.getWaitQueueLength(); + } + + @Override + public void setWaitQueueLength(int waitQueueLength) { + maybeInitBuilder(); + builder.setWaitQueueLength(waitQueueLength); + } + + @Override + public int getContainersKilled() { + YarnServerCommonProtos.ContainerQueueInfoProtoOrBuilder p = + viaProto ? proto : builder; + return p.getContainersKilled(); + } + + @Override + public void setContainersKilled(int containersKilled) { + maybeInitBuilder(); + builder.setContainersKilled(containersKilled); + } + + @Override + public int getContainersDequeued() { + YarnServerCommonProtos.ContainerQueueInfoProtoOrBuilder p = + viaProto ? proto : builder; + return p.getContainersDequeued(); + } + + @Override + public void setContainersDequeued(int containersDequeued) { + maybeInitBuilder(); + builder.setContainersDequeued(containersDequeued); + } + + @Override + public ResourceUtilization getContainersUtilization() { + YarnServerCommonProtos.ContainerQueueInfoProtoOrBuilder p = + viaProto ? proto : builder; + if (!p.hasContainersUtilization()) { + return null; + } + return convertFromProtoFormat(p.getContainersUtilization()); + } + + @Override + public void setContainersUtilization(ResourceUtilization + runningContainersUtilization) { + maybeInitBuilder(); + if (runningContainersUtilization == null) { + this.builder.clearContainersUtilization(); + return; + } + this.builder.setContainersUtilization( + convertToProtoFormat(runningContainersUtilization)); + } + + private YarnProtos.ResourceUtilizationProto convertToProtoFormat( + ResourceUtilization r) { + return ((ResourceUtilizationPBImpl) r).getProto(); + } + + private ResourceUtilizationPBImpl convertFromProtoFormat( + YarnProtos.ResourceUtilizationProto p) { + return new ResourceUtilizationPBImpl(p); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 8dd4832d7fc7842591c77ee301b729ac48aaf00d..a59aa6cb84573aa5a43c6b447f833400467b33dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -33,14 +33,17 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; -import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto; + +import org.apache.hadoop.yarn.server.api.records.ContainerQueueInfo; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -400,6 +403,27 @@ public synchronized void setIncreasedContainers( this.increasedContainers = increasedContainers; } + @Override + public ContainerQueueInfo getContainerQueueInfo() { + NodeStatusProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + if (!p.hasContainerQueueInfo()) { + return null; + } + return convertFromProtoFormat(p.getContainerQueueInfo()); + } + + @Override + public void setContainerQueueInfo(ContainerQueueInfo containerQueueInfo) { + maybeInitBuilder(); + if (containerQueueInfo == null) { + this.builder.clearContainerQueueInfo(); + return; + } + this.builder.setContainerQueueInfo( + convertToProtoFormat(containerQueueInfo)); + } + private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } @@ -433,15 +457,25 @@ private ApplicationIdProto convertToProtoFormat(ApplicationId c) { return ((ApplicationIdPBImpl)c).getProto(); } - private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) { + private YarnProtos.ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) { return ((ResourceUtilizationPBImpl) r).getProto(); } private ResourceUtilizationPBImpl convertFromProtoFormat( - ResourceUtilizationProto p) { + YarnProtos.ResourceUtilizationProto p) { return new ResourceUtilizationPBImpl(p); } + private YarnServerCommonProtos.ContainerQueueInfoProto convertToProtoFormat( + ContainerQueueInfo r) { + return ((ContainerQueueInfoPBImpl) r).getProto(); + } + + private ContainerQueueInfo convertFromProtoFormat( + YarnServerCommonProtos.ContainerQueueInfoProto p) { + return new ContainerQueueInfoPBImpl(p); + } + private ContainerPBImpl convertFromProtoFormat( ContainerProto c) { return new ContainerPBImpl(c); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 77064a0e5cce984aebc9e308bebd7e5b52ad8742..ced714a36a875c22a47137140eb29b216c6671b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -39,6 +39,15 @@ message NodeStatusProto { optional ResourceUtilizationProto containers_utilization = 6; optional ResourceUtilizationProto node_utilization = 7; repeated ContainerProto increased_containers = 8; + optional ContainerQueueInfoProto container_queue_info = 9; +} + +message ContainerQueueInfoProto { + optional int32 estimated_queue_wait_time = 1; + optional int32 wait_queue_length = 2; + optional int32 containers_killed = 3; + optional int32 containers_dequeued = 4; + optional ResourceUtilizationProto containers_utilization = 5; } message MasterKeyProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 5806731443620193763ff4f7d4674ee0d77a0341..f2341d03f9a31bf143926e0bbef39dcfc1bdbef9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -71,6 +71,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; + +import org.apache.hadoop.yarn.server.api.records.ContainerQueueInfo; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; @@ -443,6 +445,13 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException { return nodeStatus; } + private ContainerQueueInfo getContainerQueueInfo() { + ContainerManagerImpl containerManager = + (ContainerManagerImpl) this.context.getContainerManager(); + ContainersMonitor containersMonitor = + containerManager.getContainersMonitor(); + return containersMonitor.getContainerQueueInfo(); + } /** * Get the aggregated utilization of the containers in this node. * @return Resource utilization of all the containers. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java index 4d69dbfbc21609d62253fc9ba081a5c58546f345..3ef11d086127ca4ea3be74e4aec17b22a884add4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java @@ -20,10 +20,12 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.ContainerQueueInfo; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; public interface ContainersMonitor extends Service, EventHandler, ResourceView { public ResourceUtilization getContainersUtilization(); + public ContainerQueueInfo getContainerQueueInfo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 446e7a1270e9bc61574b972eff5a064e480e94ea..d46c39cb05c00514278b79220fbc80afb6113311 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.api.records.ContainerQueueInfo; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -84,6 +85,7 @@ private ResourceUtilization containersUtilization; private volatile boolean stopped = false; + private ContainerQueueInfo containerQueueInfo; public ContainersMonitorImpl(ContainerExecutor exec, AsyncDispatcher dispatcher, Context context) { @@ -96,6 +98,7 @@ public ContainersMonitorImpl(ContainerExecutor exec, this.monitoringThread = new MonitoringThread(); this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f); + this.containerQueueInfo = ContainerQueueInfo.newInstance(); } @Override @@ -698,6 +701,15 @@ public void setContainersUtilization(ResourceUtilization utilization) { } @Override + public ContainerQueueInfo getContainerQueueInfo() { + return this.containerQueueInfo; + } + + public void setContainerQueueInfo(ContainerQueueInfo containerQueueInfo) { + this.containerQueueInfo = containerQueueInfo; + } + + @Override @SuppressWarnings("unchecked") public void handle(ContainersMonitorEvent monitoringEvent) { ContainerId containerId = monitoringEvent.getContainerId(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java new file mode 100644 index 0000000000000000000000000000000000000000..4fd62d0b2dae93b2b4f3cf78693b25f792dd77b2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +import java.util.List; + +public interface ClusterMonitor { + + void addNode(List containerStatuses, RMNode rmNode); + + void removeNode(RMNode removedRMNode); + + void nodeUpdate(RMNode rmNode); + + void updateNodeResource(RMNode rmNode, ResourceOption resourceOption); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java deleted file mode 100644 index f59a322291467a9c556fa490fc2378834d32fda4..0000000000000000000000000000000000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; -import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; -import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl; - - -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.security - .AMRMTokenSecretManager; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; - -public class DistributedSchedulingService extends ApplicationMasterService - implements DistributedSchedulerProtocol { - - public DistributedSchedulingService(RMContext rmContext, - YarnScheduler scheduler) { - super(DistributedSchedulingService.class.getName(), rmContext, scheduler); - } - - @Override - public Server getServer(YarnRPC rpc, Configuration serverConf, - InetSocketAddress addr, AMRMTokenSecretManager secretManager) { - Server server = rpc.getServer(DistributedSchedulerProtocol.class, this, - addr, serverConf, secretManager, - serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, - YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); - // To support application running no NMs that DO NOT support - // Dist Scheduling... - ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, - ApplicationMasterProtocolPB.class, - ApplicationMasterProtocolService.newReflectiveBlockingService( - new ApplicationMasterProtocolPBServiceImpl(this))); - return server; - } - - @Override - public RegisterApplicationMasterResponse registerApplicationMaster - (RegisterApplicationMasterRequest request) throws YarnException, - IOException { - return super.registerApplicationMaster(request); - } - - @Override - public FinishApplicationMasterResponse finishApplicationMaster - (FinishApplicationMasterRequest request) throws YarnException, - IOException { - return super.finishApplicationMaster(request); - } - - @Override - public AllocateResponse allocate(AllocateRequest request) throws - YarnException, IOException { - return super.allocate(request); - } - - @Override - public DistSchedRegisterResponse - registerApplicationMasterForDistributedScheduling( - RegisterApplicationMasterRequest request) throws YarnException, - IOException { - RegisterApplicationMasterResponse response = - registerApplicationMaster(request); - DistSchedRegisterResponse dsResp = recordFactory - .newRecordInstance(DistSchedRegisterResponse.class); - dsResp.setRegisterResponse(response); - dsResp.setMinAllocatableCapabilty( - Resource.newInstance( - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY, - YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT), - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MIN_VCORES, - YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT) - ) - ); - dsResp.setMaxAllocatableCapabilty( - Resource.newInstance( - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY, - YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT), - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MAX_VCORES, - YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT) - ) - ); - dsResp.setContainerTokenExpiryInterval( - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS, - YarnConfiguration. - DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT)); - dsResp.setContainerIdStart( - this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); - - // Set nodes to be used for scheduling - // TODO: The actual computation of the list will happen in YARN-4412 - // TODO: Till then, send the complete list - dsResp.setNodesForScheduling( - new ArrayList<>(this.rmContext.getRMNodes().keySet())); - return dsResp; - } - - @Override - public DistSchedAllocateResponse allocateForDistributedScheduling - (AllocateRequest request) throws YarnException, IOException { - AllocateResponse response = allocate(request); - DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance - (DistSchedAllocateResponse.class); - dsResp.setAllocateResponse(response); - dsResp.setNodesForScheduling( - new ArrayList<>(this.rmContext.getRMNodes().keySet())); - return dsResp; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java new file mode 100644 index 0000000000000000000000000000000000000000..e19be1778df44709553b1c11c05ce29ed02bdb48 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.NodeId; + +import java.util.List; + +public interface NodeSelector { + + List selectNodes(); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 4fb38bb2f1abc4c55751047eb754c5d4f1e72992..5ae097a38ac45c40f25c6ddf9091e0a97fa8548c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -31,7 +31,6 @@ import org.apache.hadoop.security.*; import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; import org.apache.hadoop.security.authorize.ProxyUsers; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.ExitUtil; @@ -50,6 +49,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; @@ -79,6 +79,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .DistributedSchedulingService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -104,8 +106,6 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; /** * The ResourceManager is the main class that is a set of components. @@ -647,101 +647,10 @@ protected void createPolicyMonitors() { } @Private - public static class SchedulerEventDispatcher extends AbstractService - implements EventHandler { - - private final ResourceScheduler scheduler; - private final BlockingQueue eventQueue = - new LinkedBlockingQueue(); - private final Thread eventProcessor; - private volatile boolean stopped = false; - private boolean shouldExitOnError = false; - + public static class SchedulerEventDispatcher extends + EventDispatcher { public SchedulerEventDispatcher(ResourceScheduler scheduler) { - super(SchedulerEventDispatcher.class.getName()); - this.scheduler = scheduler; - this.eventProcessor = new Thread(new EventProcessor()); - this.eventProcessor.setName("ResourceManager Event Processor"); - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - this.shouldExitOnError = - conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, - Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - this.eventProcessor.start(); - super.serviceStart(); - } - - private final class EventProcessor implements Runnable { - @Override - public void run() { - - SchedulerEvent event; - - while (!stopped && !Thread.currentThread().isInterrupted()) { - try { - event = eventQueue.take(); - } catch (InterruptedException e) { - LOG.error("Returning, interrupted : " + e); - return; // TODO: Kill RM. - } - - try { - scheduler.handle(event); - } catch (Throwable t) { - // An error occurred, but we are shutting down anyway. - // If it was an InterruptedException, the very act of - // shutdown could have caused it and is probably harmless. - if (stopped) { - LOG.warn("Exception during shutdown: ", t); - break; - } - LOG.fatal("Error in handling event type " + event.getType() - + " to the scheduler", t); - if (shouldExitOnError - && !ShutdownHookManager.get().isShutdownInProgress()) { - LOG.info("Exiting, bbye.."); - System.exit(-1); - } - } - } - } - } - - @Override - protected void serviceStop() throws Exception { - this.stopped = true; - this.eventProcessor.interrupt(); - try { - this.eventProcessor.join(); - } catch (InterruptedException e) { - throw new YarnRuntimeException(e); - } - super.serviceStop(); - } - - @Override - public void handle(SchedulerEvent event) { - try { - int qSize = eventQueue.size(); - if (qSize !=0 && qSize %1000 == 0) { - LOG.info("Size of scheduler event-queue is " + qSize); - } - int remCapacity = eventQueue.remainingCapacity(); - if (remCapacity < 1000) { - LOG.info("Very low remaining capacity on scheduler event queue: " - + remCapacity); - } - this.eventQueue.put(event); - } catch (InterruptedException e) { - LOG.info("Interrupted. Trying to exit gracefully."); - } + super(scheduler, SchedulerEventDispatcher.class.getName()); } } @@ -1146,7 +1055,20 @@ protected ApplicationMasterService createApplicationMasterService() { if (this.rmContext.getYarnConfiguration().getBoolean( YarnConfiguration.DIST_SCHEDULING_ENABLED, YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { - return new DistributedSchedulingService(this.rmContext, scheduler); + DistributedSchedulingService distributedSchedulingService = new + DistributedSchedulingService(this.rmContext, scheduler); + DistributedSchedulingService.DistSchedulerEventDispatcher + distSchedulerEventDispatcher = + new DistributedSchedulingService.DistSchedulerEventDispatcher( + distributedSchedulingService); + // Add an event dispoatcher for the DistributedSchedulingService + // to handle node updates/additions and removals. + // Since the SchedulerEvent is currently a super set of theses, + // we register interest for it.. + addService(distSchedulerEventDispatcher); + rmDispatcher.register(SchedulerEventType.class, + distSchedulerEventDispatcher); + return distributedSchedulingService; } return new ApplicationMasterService(this.rmContext, scheduler); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index d8df9f16ef8afe24f6bc30e0b717eb6704ff7d04..6f028824a91dcb3a167186d774b9c25f2e83acaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerQueueInfo; /** * Node managers information on available resources @@ -168,4 +169,7 @@ public void updateNodeHeartbeatResponseForContainersDecreasing( NodeHeartbeatResponse response); public List pullNewlyIncreasedContainers(); + + public ContainerQueueInfo getContainerQueueInfo(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 8448287fe412223e0cdf448758ea5aaf7173b03e..cdf392d39836cefb0e90dd394f47c6d3e5b0d831 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerQueueInfo; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; @@ -120,6 +121,9 @@ /* Resource utilization for the node. */ private ResourceUtilization nodeUtilization; + /* Container Queue Information for the node.. Used by Distributed Scheduler */ + private ContainerQueueInfo containerQueueInfo; + private final ContainerAllocationExpirer containerAllocationExpirer; /* set of containers that have just launched */ private final Set launchedContainers = @@ -1080,6 +1084,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.setAggregatedContainersUtilization( statusEvent.getAggregatedContainersUtilization()); rmNode.setNodeUtilization(statusEvent.getNodeUtilization()); + rmNode.setContainerQueueInfo(statusEvent.getContainerQueueInfo()); NodeState initialState = rmNode.getState(); boolean isNodeDecommissioning = initialState.equals(NodeState.DECOMMISSIONING); @@ -1160,6 +1165,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.setAggregatedContainersUtilization( statusEvent.getAggregatedContainersUtilization()); rmNode.setNodeUtilization(statusEvent.getNodeUtilization()); + rmNode.setContainerQueueInfo(statusEvent.getContainerQueueInfo()); if (remoteNodeHealthStatus.getIsNodeHealthy()) { rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(rmNode)); @@ -1340,4 +1346,25 @@ private void handleLogAggregationStatus( writeLock.unlock(); } } - } + + @Override + public ContainerQueueInfo getContainerQueueInfo() { + this.readLock.lock(); + + try { + return this.containerQueueInfo; + } finally { + this.readLock.unlock(); + } + } + + public void setContainerQueueInfo(ContainerQueueInfo containerQueueInfo) { + this.writeLock.lock(); + + try { + this.containerQueueInfo = containerQueueInfo; + } finally { + this.writeLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index ba6ac9bd0fad6601a1ffbe9a738d9aa9440b9f89..a396ee9da2168b8e570c42a915a04b90460c0bca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerQueueInfo; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -79,6 +80,10 @@ public ResourceUtilization getNodeUtilization() { return this.logAggregationReportsForApps; } + public ContainerQueueInfo getContainerQueueInfo() { + return this.nodeStatus.getContainerQueueInfo(); + } + public void setLogAggregationReportsForApps( List logAggregationReportsForApps) { this.logAggregationReportsForApps = logAggregationReportsForApps; @@ -89,4 +94,6 @@ public void setLogAggregationReportsForApps( return this.nodeStatus.getIncreasedContainers() == null ? Collections.EMPTY_LIST : this.nodeStatus.getIncreasedContainers(); } + + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java new file mode 100644 index 0000000000000000000000000000000000000000..9edff9280262b591d1d0a72324744d4d449872c8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; +import org.apache.hadoop.yarn.event.EventDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; +import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl; + + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; + +import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .NodeResourceUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security + .AMRMTokenSecretManager; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; + +public class DistributedSchedulingService extends ApplicationMasterService + implements DistributedSchedulerProtocol, EventHandler { + + private static final Log LOG = + LogFactory.getLog(DistributedSchedulingService.class); + + public static class DistSchedulerEventDispatcher extends + EventDispatcher { + public DistSchedulerEventDispatcher(EventHandler handler) { + super(handler, handler.getClass().getName()); + } + } + + private final ClusterMonitor clusterMonitor; + private final NodeSelector nodeSelector; + + public DistributedSchedulingService(RMContext rmContext, + YarnScheduler scheduler) { + super(DistributedSchedulingService.class.getName(), rmContext, scheduler); + int k = rmContext.getYarnConfiguration().getInt( + YarnConfiguration.DIST_SCHEDULING_TOP_K, + YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT); + int stragglers = rmContext.getYarnConfiguration().getInt( + YarnConfiguration.DIST_SCHEDULING_NUM_STRAGGLERS, + YarnConfiguration.DIST_SCHEDULING_NUM_STRAGGLERS_DEFAULT); + long topKComputationInterval = rmContext.getYarnConfiguration().getLong( + YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS, + YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT); + TopKNodeSelector topKSelector = + new TopKNodeSelector(k, stragglers, topKComputationInterval); + this.clusterMonitor = topKSelector; + this.nodeSelector = topKSelector; + } + + @Override + public Server getServer(YarnRPC rpc, Configuration serverConf, + InetSocketAddress addr, AMRMTokenSecretManager secretManager) { + Server server = rpc.getServer(DistributedSchedulerProtocol.class, this, + addr, serverConf, secretManager, + serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); + // To support application running no NMs that DO NOT support + // Dist Scheduling... + ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, + ApplicationMasterProtocolPB.class, + ApplicationMasterProtocolService.newReflectiveBlockingService( + new ApplicationMasterProtocolPBServiceImpl(this))); + return server; + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster + (RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return super.registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster + (FinishApplicationMasterRequest request) throws YarnException, + IOException { + return super.finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) throws + YarnException, IOException { + return super.allocate(request); + } + + @Override + public DistSchedRegisterResponse + registerApplicationMasterForDistributedScheduling( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + RegisterApplicationMasterResponse response = + registerApplicationMaster(request); + DistSchedRegisterResponse dsResp = recordFactory + .newRecordInstance(DistSchedRegisterResponse.class); + dsResp.setRegisterResponse(response); + dsResp.setMinAllocatableCapabilty( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY, + YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT), + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MIN_VCORES, + YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT) + ) + ); + dsResp.setMaxAllocatableCapabilty( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY, + YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT), + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MAX_VCORES, + YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT) + ) + ); + dsResp.setContainerTokenExpiryInterval( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS, + YarnConfiguration. + DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT)); + dsResp.setContainerIdStart( + this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); + + // Set nodes to be used for scheduling + dsResp.setNodesForScheduling( + new ArrayList<>(this.nodeSelector.selectNodes())); + return dsResp; + } + + @Override + public DistSchedAllocateResponse allocateForDistributedScheduling + (AllocateRequest request) throws YarnException, IOException { + AllocateResponse response = allocate(request); + DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance + (DistSchedAllocateResponse.class); + dsResp.setAllocateResponse(response); + dsResp.setNodesForScheduling( + new ArrayList<>(this.rmContext.getRMNodes().keySet())); + return dsResp; + } + + @Override + public void handle(SchedulerEvent event) { + switch (event.getType()) { + case NODE_ADDED: + if (!(event instanceof NodeAddedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; + clusterMonitor.addNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); + break; + case NODE_REMOVED: + if (!(event instanceof NodeRemovedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; + clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); + break; + case NODE_UPDATE: + if (!(event instanceof NodeUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; + clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode()); + break; + case NODE_RESOURCE_UPDATE: + if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent)event; + clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + break; + + // <-- IGNORED EVENTS : START --> + case APP_ADDED: + break; + case APP_REMOVED: + break; + case APP_ATTEMPT_ADDED: + break; + case APP_ATTEMPT_REMOVED: + break; + case KILL_CONTAINER: + break; + case PREEMPT_CONTAINER: + break; + case CONTAINER_EXPIRED: + break; + case CONTAINER_RESCHEDULED: + break; + case DROP_RESERVATION: + break; + case NODE_LABELS_UPDATE: + break; + // <-- IGNORED EVENTS : END --> + default: + LOG.error("Unknown event arrived at DistributedSchedulingService: " + + event.toString()); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java new file mode 100644 index 0000000000000000000000000000000000000000..cedbdcc28748edf5205475eb466b586cd4bbeb0f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.commons.lang.math.IntRange; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.ContainerQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode + .UpdatedContainerInfo; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class TopKNodeSelector implements ClusterMonitor, NodeSelector { + + final static Log LOG = LogFactory.getLog(TopKNodeSelector.class); + + static class ClusterNode implements Comparable { + final NodeId nodeId; + int queueTime; + double timestamp; + + public ClusterNode(int queueTime, NodeId nodeId) { + this.nodeId = nodeId; + setQueueTime(queueTime); + updateTimestamp(); + } + + public void setQueueTime(int queueTime) { + this.queueTime = queueTime; + } + + public void updateTimestamp() { + this.timestamp = System.currentTimeMillis(); + } + + @Override + public int compareTo(ClusterNode o) { + if (this.queueTime == o.queueTime) { + return this.timestamp < o.timestamp ? +1 : -1; + } + return this.queueTime > o.queueTime ? +1 : -1; + } + } + + private final int k; + private final List topKNodes; + private final int stragglerNodes; + private final ScheduledExecutorService scheduledExecutor; + private final TreeSet clusterNodes = new TreeSet<>(); + private AtomicLong successTasksNo = new AtomicLong(0); + private final Random random = new Random(); + + public TopKNodeSelector(int k, int numStragglerNodes, + long nodeComputationInterval) { + this.k = k; + this.topKNodes = new ArrayList<>(); + this.stragglerNodes = numStragglerNodes; + this.scheduledExecutor = Executors.newScheduledThreadPool(1); + this.scheduledExecutor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + synchronized (topKNodes) { + topKNodes.clear(); + topKNodes.addAll( + stragglerNodes > 0 ? + computeTopKNodesWithStragglers() : computeTopKNodes()); + } + } + }, nodeComputationInterval, nodeComputationInterval, TimeUnit.MILLISECONDS); + + } + + + @Override + public void addNode(List containerStatuses, RMNode + rmNode) { + LOG.debug("Node added event from: " + rmNode.getNode().getName()); + // Ignoring this currently : atleast one NODE_UPDATE heartbeat is + // required to ensure node eligibility. + } + + @Override + public void removeNode(RMNode removedRMNode) { + LOG.debug("Node delete event for: " + removedRMNode.getNode().getName()); + synchronized (this.clusterNodes) { + Iterator iterator = this.clusterNodes.iterator(); + while (iterator.hasNext()) { + ClusterNode currentElement = iterator.next(); + if (currentElement.nodeId.equals(removedRMNode.getNodeID())) { + iterator.remove(); + LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID()); + return; + } + } + LOG.debug("Node not in list!"); + } + } + + @Override + public void nodeUpdate(RMNode rmNode) { + LOG.debug("Node update event from: " + rmNode.getNode().getName()); + ContainerQueueInfo containerQueueInfo = rmNode.getContainerQueueInfo(); + updateSuccessfulContainers(rmNode.pullContainerUpdates()); + int estimatedQueueWaitTime = containerQueueInfo.getEstimatedQueueWaitTime(); + synchronized (this.clusterNodes) { + Iterator iterator = this.clusterNodes.iterator(); + while (iterator.hasNext()) { + ClusterNode currentNode = iterator.next(); + if (currentNode.nodeId.equals(rmNode.getNodeID())) { + iterator.remove(); + if (estimatedQueueWaitTime != -1) { + this.clusterNodes.add(new ClusterNode(estimatedQueueWaitTime, + rmNode.getNodeID())); + LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "]"); + return; + } else { + LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "]"); + return; + } + } + } + if (estimatedQueueWaitTime != -1) { + this.clusterNodes.add( + new ClusterNode(estimatedQueueWaitTime, rmNode.getNodeID())); + LOG.info("Inserting new ClusterNode [" + rmNode.getNodeID() + "]"); + } + } + } + + private void updateSuccessfulContainers(List + updatedContainerInfos) { + List newlyLaunchedContainers = new ArrayList<>(); + List completedContainers = new ArrayList<>(); + for (UpdatedContainerInfo containerInfo : updatedContainerInfos) { + newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers + ()); + completedContainers.addAll(containerInfo.getCompletedContainers()); + } + int successfulTasks = 0; + // Process completed containers + for (ContainerStatus completedContainer : completedContainers) { + if (completedContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC + && completedContainer.getExitStatus() != ContainerExitStatus.ABORTED + && !completedContainer.getDiagnostics().contains("fail")) { + successfulTasks++; + } + } + if (successfulTasks > 0) { + LOG.info("Num successful completed containers : " + successfulTasks); + } + this.successTasksNo.addAndGet(successfulTasks); + } + + @Override + public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { + LOG.debug("Node resource update event from: " + rmNode.getNodeID()); + // Ignoring this currently... + } + + @Override + public List selectNodes() { + synchronized (this.topKNodes) { + return new ArrayList<>(this.topKNodes); + } + } + + private List computeTopKNodes() { + synchronized (this.clusterNodes) { + Iterator iterator = this.clusterNodes.iterator(); + List retList = new ArrayList<>(); + while (iterator.hasNext() && retList.size() <= this.k) { + retList.add(iterator.next().nodeId); + } + return retList; + } + } + + private List computeTopKNodesWithStragglers() { + synchronized (this.clusterNodes) { + if (this.clusterNodes.size() < 2 * this.k) { + LOG.info("Not enough nodes reported to introduce stragglers."); + return computeTopKNodes(); + } + List nodesList = new ArrayList<>(); + synchronized (this.clusterNodes) { + for (ClusterNode node : this.clusterNodes) { + nodesList.add(node.nodeId); + } + } + List retList = nodesList.subList(0, this.k); + + int[] nodesToReplace = shuffle(this.stragglerNodes, + new IntRange(0, this.k - 1).toArray(), random); + int[] nodesToAdd = shuffle(this.stragglerNodes, + new IntRange(this.k, nodesList.size() - 1).toArray(), random); + if (nodesToReplace.length < this.stragglerNodes + || nodesToAdd.length < this.stragglerNodes) { + return computeTopKNodes(); + } + for (int i = 0; i < nodesToReplace.length; i++) { + retList.set(nodesToReplace[i], nodesList.get(nodesToAdd[i])); + } + return retList; + } + } + + // Return *num* elements of shuffled array + private static int[] shuffle(int num, int[] arr, Random rnd) { + for (int i = arr.length; i > 1; i--) { + swap(arr, i - 1, rnd.nextInt(i)); + num--; + if (num == 0) { + return Arrays.copyOfRange(arr, i - 1, arr.length); + } + } + return arr; + } + + private static void swap(int[] arr, int i, int j) { + int tmp = arr[i]; + arr[i] = arr[j]; + arr[j] = tmp; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 89aff29b8349e94b3562ae75602d9cf2914f62f6..f2d98b73506f2083b915a685f6337d8c8439d1d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -260,6 +261,11 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + + @Override + public ContainerQueueInfo getContainerQueueInfo() { + return null; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index bd1f15c6376def9f462fe4646f9ecd97bcc65d65..709716f339d16ba98dc1cdc1bbb3b4e2558daaf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -88,6 +88,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .DistributedSchedulingService; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java index 262fd5a4c209a27ee72c910dd23c5c3d58edf338..96f47045ae8d6b1b5573a3fedb2321f39ee9d9e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java @@ -46,6 +46,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt .AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .DistributedSchedulingService; import org.junit.Assert; import org.junit.Test;