From 2a94cb9d28940147228c8cea1a2f0193ca0b3549 Mon Sep 17 00:00:00 2001 From: Garry Weng Date: Wed, 31 Aug 2016 23:10:39 +0800 Subject: [PATCH] Support multi-label merge into one node label& node label wildcard match --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 4 + .../yarn/nodelabels/CommonNodeLabelsManager.java | 4 +- .../resourcemanager/scheduler/ResourceUsage.java | 67 +++++++++- .../resourcemanager/scheduler/SchedulerUtils.java | 87 +++++++++++++ .../scheduler/capacity/CapacityScheduler.java | 4 + .../scheduler/fifo/FifoScheduler.java | 3 + .../scheduler/TestSchedulerUtils.java | 142 +++++++++++++++++++++ 7 files changed, 308 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index e224aa7..657c26c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2766,6 +2766,10 @@ public static boolean areNodeLabelsEnabled( public static final String TIMELINE_XFS_OPTIONS = TIMELINE_XFS_PREFIX + "xframe-options"; + public static final boolean DEFAULT_ENABLE_NODELABEL_WILDCARD_CHECK = true; + + public static final String ENABLE_NODELABEL_WILDCARD_CHECK = + RM_PREFIX + "enable.nodelabel.wildcard-check"; public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index 1a83632..40fa31c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -73,7 +73,7 @@ public static final String ANY = "*"; public static final Set ACCESS_ANY_LABEL_SET = ImmutableSet.of(ANY); private static final Pattern LABEL_PATTERN = Pattern - .compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*"); + .compile("^[0-9a-zA-Z][0-9a-zA-Z-_&]*"); public static final int WILDCARD_PORT = 0; // Flag to identify startup for removelabel private boolean initNodeLabelStoreInProgress = false; @@ -952,7 +952,7 @@ public static void checkAndThrowLabelName(String label) throws IOException { if (!match) { throw new IOException("label name should only contains " - + "{0-9, a-z, A-Z, -, _} and should not started with {-,_}" + + "{0-9, a-z, A-Z, -, _, &} and should not started with {-,_}" + ", now it is=" + label); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index 2857379..6b6fbd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -186,6 +186,10 @@ public Resource getPending(String label) { return _get(label, ResourceType.PENDING); } + public Resource getPendingWithLabelWildcard(String label) { + return _getWithLabelWildcard(label, ResourceType.PENDING); + } + public void incPending(String label, Resource res) { _inc(label, ResourceType.PENDING, res); } @@ -338,7 +342,68 @@ private Resource _get(String label, ResourceType type) { readLock.unlock(); } } - + + // get the first match + private Resource _getWithLabelWildcard(String label, ResourceType type) { + if (label == null) { + label = RMNodeLabelsManager.NO_LABEL; + } + + try { + readLock.lock(); + for (Map.Entry entry : usages.entrySet()) { + if (checkNodeLabelWildcard(label, entry.getKey())) { + if (entry.getValue() != null + && !Resources.equals( + normalize(entry.getValue().resArr[type.idx]), + Resources.none())) { + return normalize(entry.getValue().resArr[type.idx]); + } + } + } + return Resources.none(); + } finally { + readLock.unlock(); + } + } + + private boolean isWildcardLabel(String label) { + if(label.startsWith("*") && label.endsWith("*") && label.length() > 2){ + return true; + } + return false; + } + + private boolean checkNodeLabelWildcard(String source, String target) { + if (source == null && target != null) { + return false; + } + + if (source != null && source.equals(target)) { + return true; + } + String targetLabel = target; + if (isWildcardLabel(target)) { + targetLabel = getLabelWithoutWildcard(target); + } else { + return false; + } + String[] sourceLabels = source.split("&"); + for (String label : sourceLabels) { + if (label.equals(targetLabel)) { + return true; + } + } + return false; + } + + private String getLabelWithoutWildcard(String label) { + if (isWildcardLabel(label)) { + return label.substring(1, label.length() - 1); + } + return null; + } + private Resource _getAll(ResourceType type) { try { readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index c999e26..1e09da2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -73,6 +74,13 @@ public static final String UNRESERVED_CONTAINER = "Container reservation no longer required."; + private static boolean enableNodelabelWildcardCheck = true; + + public static void setEnableNodelabelWildcardCheck(boolean enable) { + enableNodelabelWildcardCheck = enable; + } + + /** * Utility to create a {@link ContainerStatus} during exceptional * circumstances. @@ -352,6 +360,12 @@ public static boolean checkQueueLabelExpression(Set queueLabels, if (labelExpression == null) { return true; } + if (enableNodelabelWildcardCheck + && checkQueueLabelExpressionWithWildcard(queueLabels, labelExpression, + rmContext)) { + return true; + } + for (String str : labelExpression.split("&&")) { str = str.trim(); if (!str.trim().isEmpty()) { @@ -369,6 +383,67 @@ public static boolean checkQueueLabelExpression(Set queueLabels, return true; } + public static boolean checkQueueLabelExpressionWithWildcard( + Set queueLabels, String labelExpression, RMContext rmContext) { + if (labelExpression == null) { + return true; + } + + // check queue label + if (queueLabels == null) { + return false; + } else { + Set matchedNodeLabels = null; + // check node label manager contains this label + if (null != rmContext) { + RMNodeLabelsManager nlm = rmContext.getNodeLabelManager(); + if (nlm != null) { + matchedNodeLabels = new HashSet(); + for (String label : nlm.getClusterNodeLabelNames()) { + if (checkNodeLabelWildcard(label, labelExpression)) { + matchedNodeLabels.add(label); + } + } + } + } + if(queueLabels.contains(RMNodeLabelsManager.ANY)){ + if(matchedNodeLabels == null || matchedNodeLabels.size()>0){ + return true; + } + } + + for (String label : queueLabels){ + if (checkNodeLabelWildcard(label, labelExpression) + && (matchedNodeLabels == null || matchedNodeLabels.contains(label))) { + return true; + } + } + } + return false; + } + + public static boolean checkNodeLabelWildcard(String source, String target) { + if (source == null && target != null) { + return false; + } + + if (source != null && source.equals(target)) { + return true; + } + String targetLabel = target; + if (target.startsWith("*") && target.endsWith("*") && target.length() > 2) { + targetLabel = target.substring(1, target.length() - 1); + } else { + return false; + } + String[] sourceLabels = source.split("&"); + for (String label : sourceLabels) { + if (label.equals(targetLabel)) { + return true; + } + } + return false; + } public static AccessType toAccessType(QueueACL acl) { switch (acl) { @@ -395,6 +470,10 @@ public static boolean checkResourceRequestMatchingNodePartition( if (null == requestedPartition) { requestedPartition = RMNodeLabelsManager.NO_LABEL; } + if (enableNodelabelWildcardCheck + && checkNodeLabelWildcard(nodePartitionToLookAt, askedNodePartition)) { + return true; + } return requestedPartition.equals(nodePartitionToLookAt); } @@ -404,6 +483,14 @@ private static boolean hasPendingResourceRequest(ResourceCalculator rc, usage.getPending(partitionToLookAt), Resources.none())) { return true; } + if (enableNodelabelWildcardCheck) { + if (Resources.greaterThan(rc, cluster, + usage.getPendingWithLabelWildcard(partitionToLookAt), + Resources.none())) { + LOG.info("Nodelabel " + partitionToLookAt + " wildcard match success."); + return true; + } + } return false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 33fe9ad..68193ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -332,6 +332,10 @@ private synchronized void initScheduler(Configuration configuration) throws "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + "asynchronousScheduling=" + scheduleAsynchronously + ", " + "asyncScheduleInterval=" + asyncScheduleInterval + "ms"); + + SchedulerUtils.setEnableNodelabelWildcardCheck(conf.getBoolean( + YarnConfiguration.ENABLE_NODELABEL_WILDCARD_CHECK, + YarnConfiguration.DEFAULT_ENABLE_NODELABEL_WILDCARD_CHECK)); } private synchronized void startSchedulerThreads() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index e9ffd09..e62ede5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -261,6 +261,9 @@ private synchronized void initScheduler(Configuration conf) { this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, conf); this.activeUsersManager = new ActiveUsersManager(metrics); + SchedulerUtils.setEnableNodelabelWildcardCheck(conf.getBoolean( + YarnConfiguration.ENABLE_NODELABEL_WILDCARD_CHECK, + YarnConfiguration.DEFAULT_ENABLE_NODELABEL_WILDCARD_CHECK)); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index 63f97c5..f352e70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -777,6 +778,147 @@ public void testNormalizeNodeLabelExpression() } } + @Test + public void testCheckNodeLabelWildcard() { + Assert.assertEquals("Exsit case", + SchedulerUtils.checkNodeLabelWildcard("labelA&labelB", "*labelA*"), + true); + Assert.assertEquals("Exsit case", + SchedulerUtils.checkNodeLabelWildcard("labelA", "*labelA*"), true); + Assert.assertEquals("Not exsit case", + SchedulerUtils.checkNodeLabelWildcard("labelA&labelB", "*labelC*"), + false); + Assert.assertEquals("Illegal case", + SchedulerUtils.checkNodeLabelWildcard("labelA&labelB", "**"), false); + Assert.assertEquals("Illegal case", + SchedulerUtils.checkNodeLabelWildcard(null, "*labelA*"), false); + Assert + .assertEquals("Illegal case", + SchedulerUtils.checkNodeLabelWildcard("labelA&labelB", "labelA"), + false); + Assert.assertEquals("Equal case", + SchedulerUtils.checkNodeLabelWildcard("labelA", "labelA"), true); + + } + + @Test + public void testCheckQueueLabelExpressionWithWildcard() throws IOException { + try + { + Set queueLabels = new HashSet() { + { + add("labelA&labelB"); + add("labelB&labelC"); + } + }; + + rmContext.getNodeLabelManager().addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("labelA&labelB"), + NodeLabel.newInstance("labelB&labelC"))); + Assert.assertEquals("Exsit case", SchedulerUtils + .checkQueueLabelExpressionWithWildcard(queueLabels, "*labelA*", + rmContext), + true); + } catch (Exception e) { + } finally { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels( + Arrays.asList("labelA&labelB", "labelB&labelC")); + } + + try { + Set queueLabels = new HashSet() { + { + add("labelA"); + } + }; + + rmContext.getNodeLabelManager().addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("labelA"))); + Assert.assertEquals("Exsit case", SchedulerUtils + .checkQueueLabelExpressionWithWildcard(queueLabels, "*labelA*", + rmContext), true); + } catch (Exception e) { + } finally { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels( + Arrays.asList("labelA")); + } + + try { + Set queueLabels = new HashSet() { + { + add("labelA&labelB"); + } + }; + + rmContext.getNodeLabelManager().addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("labelA&labelB"))); + Assert.assertEquals("Not exsit case", SchedulerUtils + .checkQueueLabelExpressionWithWildcard(queueLabels, "*labelC*", + rmContext), false); + } catch (Exception e) { + } finally { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels( + Arrays.asList("labelA&labelB")); + } + + try { + Set queueLabels = new HashSet() { + { + add("labelA&labelB"); + } + }; + + rmContext.getNodeLabelManager().addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("labelA&labelB"))); + Assert.assertEquals("Illegal case", SchedulerUtils + .checkQueueLabelExpressionWithWildcard(queueLabels, "**", rmContext), + false); + } catch (Exception e) { + } finally { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels( + Arrays.asList("labelA&labelB")); + } + + try { + Set queueLabels = new HashSet() { + { + add("labelA&labelB"); + add("labelB&labelC"); + } + }; + + rmContext.getNodeLabelManager().addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("labelA&labelB"), + NodeLabel.newInstance("labelB&labelC"))); + Assert.assertEquals("Illegal case", SchedulerUtils + .checkQueueLabelExpressionWithWildcard(queueLabels, "labelA", + rmContext), false); + } catch (Exception e) { + } finally { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels( + Arrays.asList("labelA&labelB", "labelB&labelC")); + } + + try { + Set queueLabels = new HashSet() { + { + add("labelA"); + } + }; + + rmContext.getNodeLabelManager().addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("labelA"))); + Assert.assertEquals("Equal case", SchedulerUtils + .checkQueueLabelExpressionWithWildcard(queueLabels, "labelA", + rmContext), true); + } catch (Exception e) { + } finally { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels( + Arrays.asList("labelA")); + } + + } + public static void waitSchedulerApplicationAttemptStopped(CapacityScheduler cs, ApplicationAttemptId attemptId) throws InterruptedException { FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(attemptId); -- 1.9.4.msysgit.0