diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index f493fd3..cbd28ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2072,6 +2072,12 @@ private static void addDeprecatedKeys() { */ public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels."; + /** Node label store implementation class */ + public static final String FS_NODE_LABELS_STORE_IMPL_CLASS = NODE_LABELS_PREFIX + + "fs-store.impl.class"; + public static final String DEFAULT_FS_NODE_LABELS_STORE_IMPL_CLASS = + "org.apache.hadoop.yarn.nodelabels.FileSystemNodeLabelsStore"; + /** URI for NodeLabelManager */ public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX + "fs-store.root-dir"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index a00c49d..8b26cc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; @@ -224,10 +225,20 @@ protected void serviceInit(Configuration conf) throws Exception { labelCollections.put(NO_LABEL, new RMNodeLabel(NO_LABEL)); } + boolean isCentralizedConfiguration() { + return isCentralizedNodeLabelConfiguration; + } + protected void initNodeLabelStore(Configuration conf) throws Exception { - this.store = new FileSystemNodeLabelsStore(this); + this.store = + ReflectionUtils + .newInstance( + conf.getClass(YarnConfiguration.FS_NODE_LABELS_STORE_IMPL_CLASS, + FileSystemNodeLabelsStore.class, NodeLabelsStore.class), + conf); + this.store.setNodeLabelsManager(this); this.store.init(conf); - this.store.recover(!isCentralizedNodeLabelConfiguration); + this.store.recover(); } // for UT purpose diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java index c9727a2..a65349b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java @@ -52,11 +52,6 @@ import com.google.common.collect.Sets; public class FileSystemNodeLabelsStore extends NodeLabelsStore { - - public FileSystemNodeLabelsStore(CommonNodeLabelsManager mgr) { - super(mgr); - } - protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class); protected static final String DEFAULT_DIR_NAME = "node-labels"; @@ -69,8 +64,8 @@ public FileSystemNodeLabelsStore(CommonNodeLabelsManager mgr) { Path fsWorkingPath; FileSystem fs; - FSDataOutputStream editlogOs; - Path editLogPath; + private FSDataOutputStream editlogOs; + private Path editLogPath; private String getDefaultFSNodeLabelsRootDir() throws IOException { // default is in local: /tmp/hadoop-yarn-${user}/node-labels/ @@ -160,12 +155,40 @@ public void removeClusterNodeLabels(Collection labels) ensureCloseEditlogFile(); } } + + protected void loadFromMirror(Path newMirrorPath, Path oldMirrorPath) + throws IOException { + // If mirror.new exists, read from mirror.new, + FSDataInputStream is = null; + if (fs.exists(newMirrorPath)) { + is = fs.open(newMirrorPath); + } else if (fs.exists(oldMirrorPath)) { + is = fs.open(oldMirrorPath); + } + + if (null != is) { + List labels = new AddToClusterNodeLabelsRequestPBImpl( + AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)) + .getNodeLabels(); + mgr.addToCluserNodeLabels(labels); + + if (mgr.isCentralizedConfiguration()) { + // Only load node to labels mapping while using centralized configuration + Map> nodeToLabels = + new ReplaceLabelsOnNodeRequestPBImpl( + ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is)) + .getNodeToLabels(); + mgr.replaceLabelsOnNode(nodeToLabels); + } + is.close(); + } + } /* (non-Javadoc) * @see org.apache.hadoop.yarn.nodelabels.NodeLabelsStore#recover(boolean) */ @Override - public void recover(boolean ignoreNodeToLabelsMappings) throws YarnException, + public void recover() throws YarnException, IOException { /* * Steps of recover @@ -181,31 +204,13 @@ public void recover(boolean ignoreNodeToLabelsMappings) throws YarnException, // Open mirror from serialized file Path mirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME); Path oldMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".old"); - - FSDataInputStream is = null; - if (fs.exists(mirrorPath)) { - is = fs.open(mirrorPath); - } else if (fs.exists(oldMirrorPath)) { - is = fs.open(oldMirrorPath); - } - - if (null != is) { - List labels = - new AddToClusterNodeLabelsRequestPBImpl( - AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)).getNodeLabels(); - Map> nodeToLabels = - new ReplaceLabelsOnNodeRequestPBImpl( - ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is)) - .getNodeToLabels(); - mgr.addToCluserNodeLabels(labels); - mgr.replaceLabelsOnNode(nodeToLabels); - is.close(); - } + + loadFromMirror(mirrorPath, oldMirrorPath); // Open and process editlog editLogPath = new Path(fsWorkingPath, EDITLOG_FILENAME); if (fs.exists(editLogPath)) { - is = fs.open(editLogPath); + FSDataInputStream is = fs.open(editLogPath); while (true) { try { @@ -233,7 +238,7 @@ public void recover(boolean ignoreNodeToLabelsMappings) throws YarnException, new ReplaceLabelsOnNodeRequestPBImpl( ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is)) .getNodeToLabels(); - if (!ignoreNodeToLabelsMappings) { + if (mgr.isCentralizedConfiguration()) { /* * In case of Distributed NodeLabels setup, * ignoreNodeToLabelsMappings will be set to true and recover will diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java index 46b94fd..aacb920 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java @@ -31,11 +31,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; public abstract class NodeLabelsStore implements Closeable { - protected final CommonNodeLabelsManager mgr; - - public NodeLabelsStore(CommonNodeLabelsManager mgr) { - this.mgr = mgr; - } + protected CommonNodeLabelsManager mgr; /** * Store node {@literal ->} label @@ -62,16 +58,14 @@ public abstract void removeClusterNodeLabels(Collection labels) * ignoreNodeToLabelsMappings will be set to true and recover will be invoked * as RM will collect the node labels from NM through registration/HB * - * @param ignoreNodeToLabelsMappings * @throws IOException * @throws YarnException */ - public abstract void recover(boolean ignoreNodeToLabelsMappings) - throws IOException, YarnException; + public abstract void recover() throws IOException, YarnException; public void init(Configuration conf) throws Exception {} - - public CommonNodeLabelsManager getNodeLabelsManager() { - return mgr; + + public void setNodeLabelsManager(CommonNodeLabelsManager mgr) { + this.mgr = mgr; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java new file mode 100644 index 0000000..f4b3804 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.nodelabels; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl; + +public class NonAppendableFSNodeLabelStore extends FileSystemNodeLabelsStore { + + @Override + public void recover() throws YarnException, + IOException { + Path newMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".new"); + Path oldMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME); + loadFromMirror(newMirrorPath, oldMirrorPath); + + // if new mirror exists, remove old mirror and rename new mirror + if (fs.exists(newMirrorPath)) { + // remove old mirror + try { + fs.delete(oldMirrorPath, false); + } catch (IOException e) { + // do nothing + if (LOG.isDebugEnabled()) { + LOG.debug(e, e); + } + } + + // rename new to old + fs.rename(newMirrorPath, oldMirrorPath); + } + } + + @Override + public void updateNodeToLabelsMappings( + Map> nodeToLabels) throws IOException { + writeNewMirror(); + } + + @Override + public void storeNewClusterNodeLabels(List labels) + throws IOException { + writeNewMirror(); + } + + @Override + public void removeClusterNodeLabels(Collection labels) + throws IOException { + writeNewMirror(); + } + + private void writeNewMirror() throws IOException { + try { + // Acquire readlock to make sure we get cluster node labels and + // node-to-labels mapping atomically. + mgr.readLock.lock(); + List nodeLabels = mgr.getClusterNodeLabels(); + Map> nodeToLabels = mgr.getNodeLabels(); + + // Write mirror to mirror.new.tmp file + Path newTmpPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".new.tmp"); + FSDataOutputStream os = fs + .create(newTmpPath, true); + ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest + .newInstance(nodeLabels)).getProto().writeDelimitedTo(os); + + if (mgr.isCentralizedConfiguration()) { + // Only save node-to-labels mapping while using centralized configuration + ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest + .newInstance(nodeToLabels)).getProto().writeDelimitedTo(os); + } + + os.close(); + + // Rename mirror.new.tmp to mirror.new (will remove .new if it's existed) + Path newPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".new"); + if (fs.exists(newPath)) { + fs.delete(newPath, false); + } + fs.rename(newTmpPath, newPath); + + // Remove existing mirror and rename mirror.new to mirror + Path mirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME); + if (fs.exists(mirrorPath)) { + fs.delete(mirrorPath, false); + } + fs.rename(newPath, mirrorPath); + } finally { + mgr.readLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java index fce663a..64c74c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java @@ -36,10 +36,10 @@ @Override public void initNodeLabelStore(Configuration conf) { - this.store = new NodeLabelsStore(this) { + this.store = new NodeLabelsStore() { @Override - public void recover(boolean ignoreNodeToLabelsMappings) + public void recover() throws IOException { } @@ -65,6 +65,8 @@ public void close() throws IOException { // do nothing } }; + + this.store.setNodeLabelsManager(this); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java index 4929f95..82e4e11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -33,13 +34,17 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; import com.google.common.collect.ImmutableMap; -import org.mockito.Mockito; +@RunWith(Parameterized.class) public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase { MockNodeLabelManager mgr = null; Configuration conf = null; + String storeClassName = null; private static class MockNodeLabelManager extends CommonNodeLabelsManager { @@ -59,8 +64,15 @@ protected void stopDispatcher() { } } - private FileSystemNodeLabelsStore getStore() { - return (FileSystemNodeLabelsStore) mgr.store; + public TestFileSystemNodeLabelsStore(String className) { + this.storeClassName = className; + } + + @Parameterized.Parameters + public static Collection getParameters() { + return Arrays.asList( + new String[][] { { FileSystemNodeLabelsStore.class.getCanonicalName() }, + { NonAppendableFSNodeLabelStore.class.getCanonicalName() } }); } @Before @@ -68,6 +80,7 @@ public void before() throws IOException { mgr = new MockNodeLabelManager(); conf = new Configuration(); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_IMPL_CLASS, storeClassName); File tempDir = File.createTempFile("nlb", ".tmp"); tempDir.delete(); tempDir.mkdirs(); @@ -80,7 +93,11 @@ public void before() throws IOException { @After public void after() throws IOException { - getStore().fs.delete(getStore().fsWorkingPath, true); + if (mgr.store instanceof FileSystemNodeLabelsStore) { + FileSystemNodeLabelsStore fsStore = + ((FileSystemNodeLabelsStore) mgr.store); + fsStore.fs.delete(fsStore.fsWorkingPath, true); + } mgr.stop(); } @@ -324,11 +341,12 @@ public void testSerilizationAfterRecovery() throws Exception { @Test public void testRootMkdirOnInitStore() throws Exception { final FileSystem mockFs = Mockito.mock(FileSystem.class); - FileSystemNodeLabelsStore mockStore = new FileSystemNodeLabelsStore(mgr) { + FileSystemNodeLabelsStore mockStore = new FileSystemNodeLabelsStore() { void setFileSystem(Configuration conf) throws IOException { fs = mockFs; } }; + mockStore.setNodeLabelsManager(mgr); mockStore.fs = mockFs; verifyMkdirsCount(mockStore, true, 0); verifyMkdirsCount(mockStore, false, 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NullRMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NullRMNodeLabelsManager.java index 2e21d26..bb0b45f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NullRMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NullRMNodeLabelsManager.java @@ -37,10 +37,10 @@ @Override public void initNodeLabelStore(Configuration conf) { - this.store = new NodeLabelsStore(this) { + this.store = new NodeLabelsStore() { @Override - public void recover(boolean ignoreNodeToLabelsMappings) + public void recover() throws IOException { // do nothing }