diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeToLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeToLabels.java new file mode 100644 index 0000000..79eaaec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeToLabels.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Evolving +public abstract class NodeToLabels { + @Public + @Evolving + public static NodeToLabels newInstance(String node, List labels) { + NodeToLabels record = Records.newRecord(NodeToLabels.class); + record.setLabels(labels); + record.setNode(node); + return record; + } + + @Public + @Evolving + public abstract void setNode(String node); + + @Public + @Evolving + public abstract String getNode(); + + @Public + @Evolving + public abstract void setLabels(List labels); + + @Public + @Evolving + public abstract List getLabels(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 034ec4f..61e62a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1299,6 +1299,19 @@ public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy"; public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY .name(); + + public static final String LABEL_PREFIX = YARN_PREFIX + "labels."; + + /** URI for NodeLabelManager */ + public static final String FS_NODE_LABEL_STORE_URI = LABEL_PREFIX + + "fs.store.uri"; + public static final String FS_NODE_LABEL_STORE_RETRY_POLICY_SPEC = + LABEL_PREFIX + "fs.retry-policy-spec"; + public static final String DEFAULT_FS_NODE_LABEL_STORE_RETRY_POLICY_SPEC = + "2000, 500"; + + /** Class of node label manager */ + public static final String NODE_LABEL_MANAGER_CLS = LABEL_PREFIX + "class"; public YarnConfiguration() { super(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/AddLabelsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/AddLabelsRequest.java new file mode 100644 index 0000000..dadf1b8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/AddLabelsRequest.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Evolving +public abstract class AddLabelsRequest { + public static AddLabelsRequest newInstance(Set labels) { + AddLabelsRequest request = + Records.newRecord(AddLabelsRequest.class); + request.setLabels(labels); + return request; + } + + @Public + @Evolving + public abstract void setLabels(Set labels); + + @Public + @Evolving + public abstract Set getLabels(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/AddLabelsResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/AddLabelsResponse.java new file mode 100644 index 0000000..4d50b4f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/AddLabelsResponse.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Evolving +public abstract class AddLabelsResponse { + public static AddLabelsResponse newInstance() { + return Records.newRecord(AddLabelsResponse.class); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoveLabelsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoveLabelsRequest.java new file mode 100644 index 0000000..35e8d1b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoveLabelsRequest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Evolving +public abstract class RemoveLabelsRequest { + public static RemoveLabelsRequest newInstance(Set partitions) { + RemoveLabelsRequest request = + Records.newRecord(RemoveLabelsRequest.class); + request.setLabels(partitions); + return request; + } + + @Public + @Evolving + public abstract void setLabels(Set partitions); + + @Public + @Evolving + public abstract Set getLabels(); +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoveLabelsResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoveLabelsResponse.java new file mode 100644 index 0000000..efd7d9b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoveLabelsResponse.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Evolving +public abstract class RemoveLabelsResponse { + public static RemoveLabelsResponse newInstance() { + return Records.newRecord(RemoveLabelsResponse.class); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SetNodeToLabelsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SetNodeToLabelsRequest.java new file mode 100644 index 0000000..490a2e9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SetNodeToLabelsRequest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Evolving +public abstract class SetNodeToLabelsRequest { + public static SetNodeToLabelsRequest newInstance( + Map> map) { + SetNodeToLabelsRequest request = + Records.newRecord(SetNodeToLabelsRequest.class); + request.setNodeToLabels(map); + return request; + } + + @Public + @Evolving + public abstract void setNodeToLabels(Map> map); + + @Public + @Evolving + public abstract Map> getNodeToLabels(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SetNodeToLabelsResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SetNodeToLabelsResponse.java new file mode 100644 index 0000000..c3f016d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SetNodeToLabelsResponse.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Evolving +public abstract class SetNodeToLabelsResponse { + public static SetNodeToLabelsResponse newInstance() { + return Records.newRecord(SetNodeToLabelsResponse.class); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index 4637f03..958908a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -75,6 +75,32 @@ message UpdateNodeResourceRequestProto { message UpdateNodeResourceResponseProto { } +message AddLabelsRequestProto { + repeated string labels = 1; +} + +message AddLabelsResponseProto { +} + +message RemoveLabelsRequestProto { + repeated string labels = 1; +} + +message RemoveLabelsResponseProto { +} + +message NodeToLabelsProto { + optional string node = 1; + repeated string labels = 2; +} + +message SetNodeToLabelsRequestProto { + repeated NodeToLabelsProto nodeToLabels = 1; +} + +message SetNodeToLabelsResponseProto { + +} ////////////////////////////////////////////////////////////////// ///////////// RM Failover related records //////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeToLabelsPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeToLabelsPBImpl.java new file mode 100644 index 0000000..47fd394 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeToLabelsPBImpl.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.NodeToLabels; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.NodeToLabelsProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.NodeToLabelsProtoOrBuilder; + +public class NodeToLabelsPBImpl extends NodeToLabels { + List labels; + NodeToLabelsProto proto = NodeToLabelsProto + .getDefaultInstance(); + NodeToLabelsProto.Builder builder = null; + boolean viaProto = false; + + public NodeToLabelsPBImpl() { + this.builder = NodeToLabelsProto.newBuilder(); + } + + public NodeToLabelsPBImpl(NodeToLabelsProto proto) { + this.proto = proto; + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = NodeToLabelsProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.labels != null && !this.labels.isEmpty()) { + builder.addAllLabels(this.labels); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + public NodeToLabelsProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void initLabels() { + if (this.labels != null) { + return; + } + NodeToLabelsProtoOrBuilder p = viaProto ? proto : builder; + this.labels = new ArrayList(); + this.labels.addAll(p.getLabelsList()); + } + + @Override + public void setLabels(List labels) { + maybeInitBuilder(); + if (labels == null || labels.isEmpty()) { + builder.clearLabels(); + } + this.labels = labels; + } + + @Override + public List getLabels() { + initLabels(); + return this.labels; + } + + @Override + public void setNode(String node) { + maybeInitBuilder(); + if (node == null) { + builder.clearNode(); + return; + } + builder.setNode(node); + } + + @Override + public String getNode() { + NodeToLabelsProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNode()) { + return null; + } + return (p.getNode()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/FileSystemNodeLabelManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/FileSystemNodeLabelManager.java new file mode 100644 index 0000000..a1a42b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/FileSystemNodeLabelManager.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SetNodeToLabelsRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.SetNodeToLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SetNodeToLabelsRequestPBImpl; + +import com.google.common.collect.Sets; + +public class FileSystemNodeLabelManager extends NodeLabelManager { + protected static final String ROOT_DIR_NAME = "FSNodeLabelManagerRoot"; + protected static final String MIRROR_FILENAME = "nodelabel.mirror"; + protected static final String EDITLOG_FILENAME = "nodelabel.editlog"; + + Path fsWorkingPath; + Path rootDirPath; + FileSystem fs; + FSDataOutputStream editlogOs; + Path editLogPath; + + @Override + protected void serviceInit(Configuration conf) throws Exception { + fsWorkingPath = + new Path(conf.get(YarnConfiguration.FS_NODE_LABEL_STORE_URI, "/tmp/")); + rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + + setFileSystem(conf); + + // mkdir of root dir path + fs.mkdirs(rootDirPath); + + super.serviceInit(conf); + } + + @Override + protected void serviceStop() throws Exception { + try { + fs.close(); + editlogOs.close(); + } catch (Exception e) { + LOG.warn("Exception happened whiling shutting down,", e); + } + + super.serviceStop(); + } + + private void setFileSystem(Configuration conf) throws IOException { + Configuration confCopy = new Configuration(conf); + confCopy.setBoolean("dfs.client.retry.policy.enabled", true); + String retryPolicy = + confCopy.get(YarnConfiguration.FS_NODE_LABEL_STORE_RETRY_POLICY_SPEC, + YarnConfiguration.DEFAULT_FS_NODE_LABEL_STORE_RETRY_POLICY_SPEC); + confCopy.set("dfs.client.retry.policy.spec", retryPolicy); + fs = fsWorkingPath.getFileSystem(confCopy); + + // if it's local file system, use RawLocalFileSystem instead of + // LocalFileSystem, the latter one doesn't support append. + if (fs.getScheme().equals("file")) { + fs = ((LocalFileSystem)fs).getRaw(); + } + } + + private void ensureAppendEditlogFile() throws IOException { + editlogOs = fs.append(editLogPath); + } + + private void ensureCloseEditlogFile() throws IOException { + editlogOs.close(); + } + + @Override + public void persistNodeToLabelsChanges( + Map> nodeToLabels) throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.NODE_TO_LABELS.ordinal()); + ((SetNodeToLabelsRequestPBImpl) SetNodeToLabelsRequest + .newInstance(nodeToLabels)).getProto().writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void persistAddingLabels(Set labels) + throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.ADD_LABELS.ordinal()); + ((AddLabelsRequestPBImpl) AddLabelsRequest.newInstance(labels)).getProto() + .writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void persistRemovingLabels(Collection labels) + throws IOException { + ensureAppendEditlogFile(); + editlogOs.writeInt(SerializedLogType.REMOVE_LABELS.ordinal()); + ((RemoveLabelsRequestPBImpl) RemoveLabelsRequest.newInstance(Sets + .newHashSet(labels.iterator()))).getProto().writeDelimitedTo(editlogOs); + ensureCloseEditlogFile(); + } + + @Override + public void recover() throws IOException { + /* + * Steps of recover + * 1) Read from last mirror (from mirror or mirror.old) + * 2) Read from last edit log, and apply such edit log + * 3) Write new mirror to mirror.writing + * 4) Rename mirror to mirror.old + * 5) Move mirror.writing to mirror + * 6) Remove mirror.old + * 7) Remove edit log and create a new empty edit log + */ + + // Open mirror from serialized file + Path mirrorPath = new Path(rootDirPath, MIRROR_FILENAME); + Path oldMirrorPath = new Path(rootDirPath, MIRROR_FILENAME + ".old"); + + FSDataInputStream is = null; + if (fs.exists(mirrorPath)) { + is = fs.open(mirrorPath); + } else if (fs.exists(oldMirrorPath)) { + is = fs.open(oldMirrorPath); + } + + if (null != is) { + Set labels = + new AddLabelsRequestPBImpl( + AddLabelsRequestProto.parseDelimitedFrom(is)).getLabels(); + Map> nodeToLabels = + new SetNodeToLabelsRequestPBImpl( + SetNodeToLabelsRequestProto.parseDelimitedFrom(is)) + .getNodeToLabels(); + addLabels(labels); + setLabelsOnMultipleNodes(nodeToLabels); + is.close(); + } + + // Open and process editlog + editLogPath = new Path(rootDirPath, EDITLOG_FILENAME); + if (fs.exists(editLogPath)) { + is = fs.open(editLogPath); + + while (true) { + try { + // read edit log one by one + SerializedLogType type = SerializedLogType.values()[is.readInt()]; + + switch (type) { + case ADD_LABELS: { + Collection partitions = + AddLabelsRequestProto.parseDelimitedFrom(is) + .getLabelsList(); + addLabels(Sets.newHashSet(partitions.iterator())); + break; + } + case REMOVE_LABELS: { + Collection partitions = + RemoveLabelsRequestProto.parseDelimitedFrom(is) + .getLabelsList(); + removeLabels(partitions); + break; + } + case NODE_TO_LABELS: { + Map> map = + new SetNodeToLabelsRequestPBImpl( + SetNodeToLabelsRequestProto.parseDelimitedFrom(is)) + .getNodeToLabels(); + setLabelsOnMultipleNodes(map); + break; + } + } + } catch (EOFException e) { + // EOF hit, break + break; + } + } + } + + // Serialize current mirror to mirror.writing + Path writingMirrorPath = new Path(rootDirPath, MIRROR_FILENAME + ".writing"); + FSDataOutputStream os = fs.create(writingMirrorPath, true); + ((AddLabelsRequestPBImpl) AddLabelsRequestPBImpl + .newInstance(super.existingLabels)).getProto().writeDelimitedTo(os); + ((SetNodeToLabelsRequestPBImpl) SetNodeToLabelsRequest + .newInstance(super.nodeToLabels)).getProto().writeDelimitedTo(os); + os.close(); + + // Move mirror to mirror.old + if (fs.exists(mirrorPath)) { + fs.delete(oldMirrorPath, false); + fs.rename(mirrorPath, oldMirrorPath); + } + + // move mirror.writing to mirror + fs.rename(writingMirrorPath, mirrorPath); + fs.delete(writingMirrorPath, false); + + // remove mirror.old + fs.delete(oldMirrorPath, false); + + // create a new editlog file + editlogOs = fs.create(editLogPath, true); + editlogOs.close(); + + LOG.info("Finished write mirror at:" + mirrorPath.toString()); + LOG.info("Finished create editlog file at:" + editLogPath.toString()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/MemoryNodeLabelManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/MemoryNodeLabelManager.java new file mode 100644 index 0000000..9851ba8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/MemoryNodeLabelManager.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +public class MemoryNodeLabelManager extends NodeLabelManager { + @Override + public void recover() throws IOException { + // do nothing + } + + @Override + public void persistNodeToLabelsChanges( + Map> nodeToPartitions) throws IOException { + // do nothing + } + + @Override + public void persistAddingLabels(Set partition) throws IOException { + // do nothing + } + + @Override + public void persistRemovingLabels(Collection partitions) + throws IOException { + // do nothing + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/NodeLabelConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/NodeLabelConfiguration.java new file mode 100644 index 0000000..bbec1bd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/NodeLabelConfiguration.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +public class NodeLabelConfiguration extends Configuration { + public final static String PREFIX = "yarn.node-label."; + + public final static String LABELS_KEY = PREFIX + "labels"; + public final static String NODES_KEY = PREFIX + "nodes"; + + public final static String NODE_LABELS_SUFFIX = ".labels"; + + public static enum LoadStrategy { + INITIAL, REPLACE, MERGE, CLEAR + } + + public NodeLabelConfiguration(String absolutePath) { + super(false); + Path absoluteLocalPath = new Path("file", "", absolutePath); + addResource(absoluteLocalPath); + } + + public Set getLabels() { + Set labelsSet = new HashSet(); + String[] labels = getStrings(LABELS_KEY); + if (null != labels) { + for (String l : labels) { + if (l.trim().isEmpty()) { + continue; + } + labelsSet.add(l); + } + } + return labelsSet; + } + + public Map> getNodeToLabels() { + Map> nodeToLabels = new HashMap>(); + + String[] nodes = getStrings(NODES_KEY); + if (null != nodes) { + for (String n : nodes) { + if (n.trim().isEmpty()) { + continue; + } + String[] labels = getStrings(NODES_KEY + "." + n + NODE_LABELS_SUFFIX); + nodeToLabels.put(n, new HashSet()); + + if (labels != null) { + for (String l : labels) { + if (l.trim().isEmpty()) { + continue; + } + nodeToLabels.get(n).add(l); + } + } + } + } + + return nodeToLabels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/NodeLabelManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/NodeLabelManager.java new file mode 100644 index 0000000..844add9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/NodeLabelManager.java @@ -0,0 +1,1099 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.label.event.AddLabelsEvent; +import org.apache.hadoop.yarn.label.event.NodeLabelManagerEvent; +import org.apache.hadoop.yarn.label.event.NodeLabelManagerEventType; +import org.apache.hadoop.yarn.label.event.RemoveLabelsEvent; +import org.apache.hadoop.yarn.label.event.StoreNodeToLabelsEvent; +import org.apache.hadoop.yarn.state.InvalidStateTransitonException; +import org.apache.hadoop.yarn.state.SingleArcTransition; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public abstract class NodeLabelManager extends AbstractService { + protected static final Log LOG = LogFactory.getLog(NodeLabelManager.class); + private static final int MAX_LABEL_LENGTH = 255; + public static final Set EMPTY_STRING_SET = Collections + .unmodifiableSet(new HashSet(0)); + public static final String ANY = "*"; + public static final Set ACCESS_ANY_LABEL_SET = ImmutableSet.of(ANY); + + /** + * If a user doesn't specify label of a queue or node, it belongs + * DEFAULT_LABEL + */ + public static final String NO_LABEL = ""; + + private enum NodeLabelManagerState { + DEFAULT + }; + + protected enum SerializedLogType { + ADD_LABELS, NODE_TO_LABELS, REMOVE_LABELS + } + + private static final StateMachineFactory stateMachineFactory = + new StateMachineFactory( + NodeLabelManagerState.DEFAULT) + .addTransition(NodeLabelManagerState.DEFAULT, + NodeLabelManagerState.DEFAULT, + NodeLabelManagerEventType.STORE_NODE_TO_LABELS, + new StoreNodeToLabelsTransition()) + .addTransition(NodeLabelManagerState.DEFAULT, + NodeLabelManagerState.DEFAULT, + NodeLabelManagerEventType.ADD_LABELS, new AddLabelsTransition()) + .addTransition(NodeLabelManagerState.DEFAULT, + NodeLabelManagerState.DEFAULT, + NodeLabelManagerEventType.REMOVE_LABELS, + new RemoveLabelsTransition()); + + private final StateMachine stateMachine; + + private static class StoreNodeToLabelsTransition implements + SingleArcTransition { + @Override + public void transition(NodeLabelManager store, NodeLabelManagerEvent event) { + if (!(event instanceof StoreNodeToLabelsEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + StoreNodeToLabelsEvent e = (StoreNodeToLabelsEvent) event; + try { + store.persistNodeToLabelsChanges(e.getNodeToLabels()); + } catch (IOException ioe) { + LOG.error("Error removing store node to label:" + ioe.getMessage()); + throw new YarnRuntimeException(ioe); + } + }; + } + + private static class AddLabelsTransition implements + SingleArcTransition { + @Override + public void transition(NodeLabelManager store, NodeLabelManagerEvent event) { + if (!(event instanceof AddLabelsEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + AddLabelsEvent e = (AddLabelsEvent) event; + try { + store.persistAddingLabels(e.getLabels()); + } catch (IOException ioe) { + LOG.error("Error storing new label:" + ioe.getMessage()); + throw new YarnRuntimeException(ioe); + } + }; + } + + private static class RemoveLabelsTransition implements + SingleArcTransition { + @Override + public void transition(NodeLabelManager store, NodeLabelManagerEvent event) { + if (!(event instanceof RemoveLabelsEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + RemoveLabelsEvent e = (RemoveLabelsEvent) event; + try { + store.persistRemovingLabels(e.getLabels()); + } catch (IOException ioe) { + LOG.error("Error removing label on filesystem:" + ioe.getMessage()); + throw new YarnRuntimeException(ioe); + } + }; + } + + protected Dispatcher dispatcher; + + // existing labels in the cluster + protected Set existingLabels = new ConcurrentSkipListSet(); + + // node to labels and label to nodes + protected Map> nodeToLabels = + new ConcurrentHashMap>(); + private Map> labelToNodes = + new ConcurrentHashMap>(); + + // running node and label to running nodes + private ConcurrentMap> labelToActiveNodes = + new ConcurrentHashMap>(); + private Set runningNodes = new ConcurrentSkipListSet(); + + // recording label to queues and queue to Resource + private ConcurrentMap> queueToLabels = + new ConcurrentHashMap>(); + private ConcurrentMap queueToResource = + new ConcurrentHashMap(); + + // node name -> map + // This is used to calculate how much resource in each node, use a nested map + // because it is possible multiple NMs launch in a node + private Map> nodeToResource = + new ConcurrentHashMap>(); + private Map labelToResource = + new ConcurrentHashMap(); + + private final ReadLock readLock; + private final WriteLock writeLock; + private AccessControlList adminAcl; + + public NodeLabelManager() { + super(NodeLabelManager.class.getName()); + stateMachine = stateMachineFactory.make(this); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + // for UT purpose + protected void initDispatcher(Configuration conf) { + // create async handler + dispatcher = new AsyncDispatcher(); + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.init(conf); + asyncDispatcher.setDrainEventsOnStop(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + adminAcl = new AccessControlList(conf.get( + YarnConfiguration.YARN_ADMIN_ACL, + YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); + + // recover from previous state + recover(); + } + + private void checkAccess(String method) throws IOException { + // TODO, add permission checking here, make sure only admin can invoke + // this method + } + + Map> getDefaultNodeToLabels(NodeLabelConfiguration conf) + throws IOException { + return conf.getNodeToLabels(); + } + + protected void addDefaultNodeToLabels( + Map> defaultNodeToLabels) throws IOException { + Set labels = new HashSet(); + for (Set t : defaultNodeToLabels.values()) { + labels.addAll(t); + } + addLabels(labels); + + setLabelsOnMultipleNodes(defaultNodeToLabels); + } + + // for UT purpose + protected void startDispatcher() { + // start dispatcher + AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher; + asyncDispatcher.start(); + } + + @Override + protected void serviceStart() throws Exception { + // init dispatcher only when service start, because recover will happen in + // service init, we don't want to trigger any event handling at that time. + initDispatcher(getConfig()); + + dispatcher.register(NodeLabelManagerEventType.class, + new ForwardingEventHandler()); + + startDispatcher(); + } + + /** + * Store node -> label to filesystem + */ + public abstract void persistNodeToLabelsChanges( + Map> nodeToLabels) throws IOException; + + /** + * Store new label to filesystem + */ + public abstract void persistAddingLabels(Set label) + throws IOException; + + /* + * Remove label from filesystem + */ + public abstract void persistRemovingLabels(Collection labels) + throws IOException; + + /** + * Recover node label from file system + */ + public abstract void recover() + throws IOException; + + private boolean isAlphabetic(char c) { + if (('a' <= c && 'z' >= c) || ('A' <= c && 'Z' >= c)) { + return true; + } + return false; + } + + protected void checkLabelName(String label) throws IOException { + if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) { + throw new IOException("label added is empty or exceeds " + + MAX_LABEL_LENGTH + " charactors"); + } + + boolean valid = true; + for (int i = 0; i < label.length(); i++) { + if (!(isAlphabetic(label.charAt(i)) || Character.isDigit(label.charAt(i)) + || label.charAt(i) == '-' || label.charAt(i) == '_')) { + valid = false; + break; + } + } + + if (!valid) { + throw new IOException("label contains charactor besides digits," + + " alphabetics, \"-\" and \"_\", label=" + label); + } + + if (label.startsWith("-") || label.startsWith("_")) { + throw new IOException("Label shouldn't start with \"-\" or \"_\", label=" + + label); + } + } + + protected String normalizeLabel(String label) { + if (label != null) { + return label.toLowerCase(); + } + return NO_LABEL; + } + + protected Set normalizeLabels(Set labels) { + Set newLabels = new HashSet(); + for (String label : labels) { + newLabels.add(normalizeLabel(label)); + } + return newLabels; + } + + /** + * Add a label to repository + * + * @param label + * label label + */ + public void addLabel(String label) throws IOException { + checkLabelName(label); + addLabels(ImmutableSet.of(label)); + } + + /** + * Add multiple labels to repository + * + * @param labels new labels added + */ + @SuppressWarnings("unchecked") + public void addLabels(Set labels) throws IOException { + checkAccess("addLabels"); + + if (null == labels || labels.isEmpty()) { + return; + } + + try { + writeLock.lock(); + Set normalizedLabels = new HashSet(); + for (String label : labels) { + checkLabelName(label); + String normalizedLabel = normalizeLabel(label); + this.existingLabels.add(normalizedLabel); + normalizedLabels.add(normalizedLabel); + } + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new AddLabelsEvent(normalizedLabels)); + } + + LOG.info("Add labels: [" + StringUtils.join(labels.iterator(), ",") + "]"); + } finally { + writeLock.unlock(); + } + } + + /** + * Remove a label from repository + * + * @param labelToRemove + * @throws IOException + */ + public void removeLabel(String labelToRemove) throws IOException { + removeLabels(Arrays.asList(labelToRemove)); + } + + private void addNMInNodeAlreadyHasNM(Set labels, Resource newNMRes) { + try { + writeLock.lock(); + for (String label : labels) { + Resource originalRes = labelToResource.get(label); + labelToResource.put(label, Resources.add(newNMRes, originalRes)); + } + for (String queueName : queueToLabels.keySet()) { + if (isNodeUsableByQueue(labels, queueName)) { + Resource res = queueToResource.get(queueName); + Resources.addTo(res, newNMRes); + } + } + } finally { + writeLock.unlock(); + } + } + + private void + removeNMToNodeAlreadyHasNM(Set labels, Resource newNMRes) { + try { + writeLock.lock(); + for (String label : labels) { + Resource originalRes = labelToResource.get(label); + labelToResource.put(label, Resources.subtract(originalRes, newNMRes)); + } + for (String queueName : queueToLabels.keySet()) { + if (isNodeUsableByQueue(labels, queueName)) { + Resource res = queueToResource.get(queueName); + Resources.subtractFrom(res, newNMRes); + } + } + } finally { + writeLock.unlock(); + } + } + + private enum UpdateLabelResourceType { + ACTIVE, + DEACTIVE, + UPDATE_LABEL + } + + private void updateLabelResource(Map> addLabelToNodes, + Map> removeLabelToNodes, + Map> originalNodeToLabels, + UpdateLabelResourceType updateType) { + try { + writeLock.lock(); + + // process add label to nodes + if (addLabelToNodes != null) { + for (Entry> entry : addLabelToNodes.entrySet()) { + String label = entry.getKey(); + Set nodes = entry.getValue(); + + // update label to active nodes + labelToActiveNodes.putIfAbsent(label, new HashSet()); + labelToActiveNodes.get(label).addAll(addLabelToNodes.get(label)); + + // update label to resource + Resource res = Resource.newInstance(0, 0); + for (String node : nodes) { + Resources.addTo(res, getResourceOfNode(node)); + } + Resource originalRes = labelToResource.get(label); + labelToResource.put(label, + originalRes == null ? res : Resources.add(res, originalRes)); + } + } + + // process remove label to nodes + if (removeLabelToNodes != null) { + for (Entry> entry : removeLabelToNodes.entrySet()) { + String label = entry.getKey(); + Set nodes = entry.getValue(); + + // update label to active nodes + if (labelToActiveNodes.get(label) != null) { + labelToActiveNodes.get(label).removeAll(nodes); + } + + // update label to resource + Resource res = Resource.newInstance(0, 0); + for (String node : nodes) { + Resources.addTo(res, getResourceOfNode(node)); + } + Resource originalRes = labelToResource.get(label); + labelToResource.put(label, Resources.subtract(originalRes, res)); + } + } + + // update queue to resource + for (Entry> originEntry : originalNodeToLabels + .entrySet()) { + String node = originEntry.getKey(); + Set originLabels = originEntry.getValue(); + Set nowLabels = nodeToLabels.get(node); + + for (String q : queueToResource.keySet()) { + Resource queueResource = queueToResource.get(q); + boolean pastUsable = isNodeUsableByQueue(originLabels, q); + boolean nowUsable = isNodeUsableByQueue(nowLabels, q); + + if (updateType == UpdateLabelResourceType.UPDATE_LABEL) { + if (pastUsable && !nowUsable) { + Resources.subtractFrom(queueResource, getResourceOfNode(node)); + } else if (!pastUsable && nowUsable) { + Resources.addTo(queueResource, getResourceOfNode(node)); + } + } else if (updateType == UpdateLabelResourceType.ACTIVE) { + if (nowUsable) { + Resources.addTo(queueResource, getResourceOfNode(node)); + } + } else if (updateType == UpdateLabelResourceType.DEACTIVE) { + if (nowUsable) { + Resources.subtractFrom(queueResource, getResourceOfNode(node)); + } + } + } + } + } finally { + writeLock.unlock(); + } + } + + private boolean isNodeUsableByQueue(Set nodeLabels, String queueName) { + // node without any labels can be accessed by any queue + if (nodeLabels == null || nodeLabels.isEmpty() + || (nodeLabels.size() == 1 && nodeLabels.contains(NO_LABEL))) { + return true; + } + + for (String label : nodeLabels) { + if (queueToLabels.containsKey(queueName) + && queueToLabels.get(queueName).contains(label)) { + return true; + } + } + + return false; + } + + private void removeAll(Map> map, String key, + Set set) { + if (set == null) { + return; + } + if (!map.containsKey(key)) { + return; + } + map.get(key).remove(set); + } + + private void remove(Map> map, String key, String value) { + if (value == null) { + return; + } + if (!map.containsKey(key)) { + return; + } + map.get(key).remove(value); + if (map.get(key).isEmpty()) { + map.remove(key); + } + } + + private void add(Map> map, String key, String value) { + if (value == null) { + return; + } + if (!map.containsKey(key)) { + map.put(key, new HashSet()); + } + map.get(key).add(value); + } + + /** + * Remove multiple labels labels from repository + * + * @param labelsToRemove + * @throws IOException + */ + @SuppressWarnings("unchecked") + public void removeLabels(Collection labelsToRemove) + throws IOException { + checkAccess("removeLabels"); + + if (null == labelsToRemove || labelsToRemove.isEmpty()) { + return; + } + + try { + writeLock.lock(); + + Map> labelToActiveNodeAdded = + new HashMap>(); + Map> labelToActiveNodeRemoved = + new HashMap>(); + Map> originalNodeToLabels = + new HashMap>(); + + for (String label : labelsToRemove) { + label = normalizeLabel(label); + if (label == null || label.isEmpty() || !existingLabels.contains(label)) { + throw new IOException("Label to be removed is null or empty"); + } + + // remove it from label + this.existingLabels.remove(label); + + // remove it from labelToActiveNodes + Set activeNodes = labelToActiveNodes.remove(label); + removeAll(labelToActiveNodeRemoved, label, activeNodes); + + // update node -> labels + Set nodes = labelToNodes.remove(label); + + // update node to labels + if (nodes != null) { + for (String node : nodes) { + if (!originalNodeToLabels.containsKey(node) + && nodeToLabels.containsKey(node)) { + Set originalLabels = + Sets.newHashSet(nodeToLabels.get(node)); + originalNodeToLabels.put(node, originalLabels); + } + remove(nodeToLabels, node, label); + // if we don't have any labels in a node now, we will mark this node + // as no label + if (runningNodes.contains(node) + && (nodeToLabels.get(node) == null || nodeToLabels.get(node) + .isEmpty())) { + add(labelToActiveNodeAdded, NO_LABEL, node); + } + } + } + } + + // update resource + updateLabelResource(labelToActiveNodeAdded, labelToActiveNodeRemoved, + originalNodeToLabels, UpdateLabelResourceType.UPDATE_LABEL); + + // create event to remove labels + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new RemoveLabelsEvent(labelsToRemove)); + } + + LOG.info("Remove labels: [" + + StringUtils.join(labelsToRemove.iterator(), ",") + "]"); + } finally { + writeLock.unlock(); + } + } + + private void verifyNodeLabel(String node, String label) throws IOException { + if (node == null || node.isEmpty()) { + throw new IOException( + "Trying to change label on a node, but node is null or empty"); + } + if (label != null && !label.isEmpty() && !existingLabels.contains(label)) { + throw new IOException("Label doesn't exist in repository, " + + "have you added it before? label=" + label); + } + } + + private Set emptyWhenNull(Set s) { + if (s == null) { + return new HashSet(); + } + return s; + } + + /** + * Set node -> label, if label is null or empty, it means remove label on node + * + * @param newNodeToLabels node -> label map + */ + @SuppressWarnings("unchecked") + public void + setLabelsOnMultipleNodes(Map> newNodeToLabels) + throws IOException { + checkAccess("setLabelsOnMultipleNodes"); + + if (null == newNodeToLabels || newNodeToLabels.isEmpty()) { + return; + } + + try { + writeLock.lock(); + + Map> labelToActiveNodeAdded = + new HashMap>(); + Map> labelToActiveNodeRemoved = + new HashMap>(); + Map> originalNodeToLabels = + new HashMap>(); + + for (Entry> e : newNodeToLabels.entrySet()) { + String node = e.getKey(); + Set labels = e.getValue(); + + // normalize and verify + labels = normalizeLabels(labels); + for (String label : labels) { + verifyNodeLabel(node, label); + } + + // handling labels removed + Set originalLabels = emptyWhenNull(nodeToLabels.get(node)); + Set difference = Sets.difference(originalLabels, labels); + for (String removedLabel : difference) { + remove(labelToNodes, removedLabel, node); + if (runningNodes.contains(node)) { + add(labelToActiveNodeRemoved, removedLabel, node); + } + } + + // Mark this node as "no-label" if we set a empty set of label + if (labels.isEmpty() && !originalLabels.isEmpty() + && runningNodes.contains(node)) { + add(labelToActiveNodeAdded, NO_LABEL, node); + } + + // handling labels added + for (String addedLabel : Sets.difference(labels, originalLabels)) { + add(labelToNodes, addedLabel, node); + if (runningNodes.contains(node)) { + add(labelToActiveNodeAdded, addedLabel, node); + } + } + + // Mark this node not "no-label" if we set a non-empty set of label + if (!labels.isEmpty() && originalLabels.isEmpty() + && runningNodes.contains(node)) { + add(labelToActiveNodeRemoved, NO_LABEL, node); + } + } + + // save original node to labels + for (String node : newNodeToLabels.keySet()) { + if (!originalNodeToLabels.containsKey(node) + && nodeToLabels.containsKey(node)) { + Set originalLabels = Sets.newHashSet(nodeToLabels.get(node)); + originalNodeToLabels.put(node, originalLabels); + } + } + + // update node to labels and label to nodes + nodeToLabels.putAll(newNodeToLabels); + + updateLabelResource(labelToActiveNodeAdded, labelToActiveNodeRemoved, + originalNodeToLabels, UpdateLabelResourceType.UPDATE_LABEL); + + if (null != dispatcher) { + dispatcher.getEventHandler().handle( + new StoreNodeToLabelsEvent(newNodeToLabels)); + } + + // shows node->labels we added + LOG.info("setLabelsOnMultipleNodes:"); + for (Entry> entry : newNodeToLabels.entrySet()) { + LOG.info(" host=" + entry.getKey() + ", labels=[" + + StringUtils.join(entry.getValue().iterator(), ",") + "]"); + } + } finally { + writeLock.unlock(); + } + } + + public void setLabelsOnSingleNode(String node, Set labels) + throws IOException { + setLabelsOnMultipleNodes(ImmutableMap.of(node, labels)); + } + + private Resource getResourceOfNode(String node) { + Resource res = Resource.newInstance(0, 0); + if (nodeToResource.containsKey(node)) { + for (Resource r : nodeToResource.get(node).values()) { + Resources.addTo(res, r); + } + } + return res; + } + + /** + * Set label on node, if label is null or empty, it means remove label on node + * + * @param node + * @param labels + */ + public void removeLabelsOnNodes(String node, Set labels) + throws IOException { + setLabelsOnMultipleNodes(ImmutableMap.of(node, labels)); + } + + public Resource getResourceWithNoLabel() throws IOException { + return getResourceWithLabel(NO_LABEL); + } + + public Resource getResourceWithLabel(String label) { + label = normalizeLabel(label); + try { + readLock.lock(); + Resource res = labelToResource.get(label); + return res == null ? Resources.none() : res; + } finally { + readLock.unlock(); + } + } + + /* + * Following methods are used for setting if a node is up and running, which + * will be used by this#getActiveNodesByLabel and getLabelResource + */ + public void activeNode(NodeId node, Resource resource) { + try { + writeLock.lock(); + String nodeName = node.getHost(); + + // put this node to nodeToResource + if (!nodeToResource.containsKey(nodeName)) { + nodeToResource.put(nodeName, new ConcurrentHashMap()); + } + + if (null != nodeToResource.get(nodeName).put(node, resource)) { + String msg = + "This shouldn't happen, trying to active node," + + " but there's already a node here, " + + "please check what happened. NodeId=" + node.toString(); + LOG.error(msg); + throw new YarnRuntimeException(msg); + } + + // add add it to running node + runningNodes.add(nodeName); + + // update resources + Set labels = nodeToLabels.get(nodeName); + labels = + (labels == null || labels.isEmpty()) ? ImmutableSet.of(NO_LABEL) + : labels; + + if (nodeToResource.get(nodeName).size() <= 1) { + Map> labelToActiveNodeAdded = + new HashMap>(); + for (String label : labels) { + labelToActiveNodeAdded.put(label, ImmutableSet.of(nodeName)); + } + Map> originalNodeTolabels = + new HashMap>(); + if (nodeToLabels.containsKey(nodeName)) { + originalNodeTolabels.put(nodeName, nodeToLabels.get(nodeName)); + } else { + originalNodeTolabels.put(nodeName, NodeLabelManager.EMPTY_STRING_SET); + } + updateLabelResource(labelToActiveNodeAdded, null, originalNodeTolabels, + UpdateLabelResourceType.ACTIVE); + } else { + // Support more than two NMs in a same node + addNMInNodeAlreadyHasNM(labels, resource); + } + } finally { + writeLock.unlock(); + } + } + + public void deactiveNode(NodeId node) { + try { + writeLock.lock(); + String nodeName = node.getHost(); + Resource res = null; + + // add add it to running node + runningNodes.add(nodeName); + + // update resources + Set labels = nodeToLabels.get(nodeName); + labels = + labels == null || labels.isEmpty() ? ImmutableSet.of(NO_LABEL) + : labels; + + // this is last NM in this node + if (nodeToResource.get(nodeName).size() == 1) { + Map> labelToActiveNodeRemoved = + new HashMap>(); + for (String label : labels) { + labelToActiveNodeRemoved.put(label, ImmutableSet.of(nodeName)); + labelToActiveNodes.get(label).remove(nodeName); + } + Map> originalNodeTolabels = + new HashMap>(); + if (nodeToLabels.containsKey(nodeName)) { + originalNodeTolabels.put(nodeName, nodeToLabels.get(nodeName)); + } else { + originalNodeTolabels.put(nodeName, NodeLabelManager.EMPTY_STRING_SET); + } + updateLabelResource(null, labelToActiveNodeRemoved, + originalNodeTolabels, UpdateLabelResourceType.DEACTIVE); + } + + // update node to resource + if (null == (res = nodeToResource.get(nodeName).remove(node))) { + String msg = + "This shouldn't happen, trying to active node," + + " but there's already a node here, " + + "please check what happened. NodeId=" + node.toString(); + LOG.error(msg); + throw new YarnRuntimeException(msg); + } + + // if there's more NM remains + if (nodeToResource.get(nodeName).size() > 0) { + // Support more than two NMs in a same node + removeNMToNodeAlreadyHasNM(labels, res); + } + } finally { + writeLock.unlock(); + } + } + + public void updateNodeResource(NodeId node, Resource newResource) { + deactiveNode(node); + activeNode(node, newResource); + } + + /** + * Remove labels on given nodes + * + * @param nodes + * to remove labels + */ + public void removeLabelsOnNodes(Collection nodes) throws IOException { + Map> map = + new HashMap>(nodes.size()); + for (String node : nodes) { + map.put(node, EMPTY_STRING_SET); + } + setLabelsOnMultipleNodes(map); + } + + /** + * Remove label on given node + * + * @param node + * to remove label + */ + public void removeLabelOnNode(String node) throws IOException { + removeLabelsOnNodes(Arrays.asList(node)); + } + + /** + * Clear all labels and related mapping from NodeLabelManager + * @throws IOException + */ + public void clearAllLabels() throws IOException { + try { + writeLock.lock(); + Set dupLabels = Sets.newHashSet(getLabels()); + removeLabels(dupLabels); + } finally { + writeLock.unlock(); + } + } + + /** + * Get nodes by given label + * + * @param label + * @return nodes has assigned give label label + */ + public Collection getActiveNodesByLabel(String label) + throws IOException { + label = normalizeLabel(label); + try { + readLock.lock(); + return Collections.unmodifiableCollection(labelToActiveNodes.get(label)); + } finally { + readLock.unlock(); + } + } + + /** + * Get number of nodes by given label + * + * @param label + * @return Get number of nodes by given label + */ + public int getNumOfNodesByLabel(String label) throws IOException { + label = normalizeLabel(label); + try { + readLock.lock(); + Collection nodes = labelToActiveNodes.get(label); + return nodes == null ? 0 : nodes.size(); + } finally { + readLock.unlock(); + } + } + + /** + * Get mapping of nodes to labels + * + * @return nodes to labels map + */ + public Map> getNodesToLabels() throws IOException { + try { + readLock.lock(); + return Collections.unmodifiableMap(nodeToLabels); + } finally { + readLock.unlock(); + } + } + + public Set getLabelsOnNode(String node) { + Set label = nodeToLabels.get(node); + return label == null ? EMPTY_STRING_SET : Collections + .unmodifiableSet(label); + } + + /** + * Get existing valid labels in repository + * + * @return existing valid labels in repository + */ + public Set getLabels() throws IOException { + try { + readLock.lock(); + return Collections.unmodifiableSet(existingLabels); + } finally { + readLock.unlock(); + } + } + + public boolean containsLabel(String label) { + try { + readLock.lock(); + return label != null + && (label.isEmpty() || existingLabels.contains(label.toLowerCase())); + } finally { + readLock.unlock(); + } + } + + public void reinitializeQueueLabels(Map> queueToLabels) { + try { + writeLock.lock(); + // clear before set + this.queueToLabels.clear(); + queueToResource.clear(); + + for (Entry> entry : queueToLabels.entrySet()) { + String queue = entry.getKey(); + Set labels = entry.getValue(); + labels = labels.isEmpty() ? ImmutableSet.of(NO_LABEL) : labels; + if (labels.contains(ANY)) { + continue; + } + + this.queueToLabels.put(queue, labels); + + // empty label node can be accessed by any queue + Set dupLabels = new HashSet(labels); + dupLabels.add(""); + Set accessedNodes = new HashSet(); + Resource totalResource = Resource.newInstance(0, 0); + for (String label : dupLabels) { + if (labelToActiveNodes.containsKey(label)) { + for (String node : labelToActiveNodes.get(label)) { + if (!accessedNodes.contains(node)) { + accessedNodes.add(node); + Resources.addTo(totalResource, getResourceOfNode(node)); + } + } + } + } + queueToResource.put(queue, totalResource); + } + } finally { + writeLock.unlock(); + } + } + + public Resource getQueueResource(String queueName, Set queueLabels, + Resource clusterResource) { + if (queueLabels.contains(ANY)) { + return clusterResource; + } + Resource res = queueToResource.get(queueName); + return res == null ? clusterResource : res; + } + + // Dispatcher related code + protected void handleStoreEvent(NodeLabelManagerEvent event) { + try { + this.stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitonException e) { + LOG.error("Can't handle this event at current state", e); + } + } + + private final class ForwardingEventHandler implements + EventHandler { + + @Override + public void handle(NodeLabelManagerEvent event) { + if (isInState(STATE.STARTED)) { + handleStoreEvent(event); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/NodeLabelManagerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/NodeLabelManagerFactory.java new file mode 100644 index 0000000..647a80a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/NodeLabelManagerFactory.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +public class NodeLabelManagerFactory { + + public static NodeLabelManager getNodeLabelManager(Configuration conf) { + NodeLabelManager mgr = ReflectionUtils.newInstance( + conf.getClass(YarnConfiguration.NODE_LABEL_MANAGER_CLS, + FileSystemNodeLabelManager.class, NodeLabelManager.class), + conf); + return mgr; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/NodeLabelUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/NodeLabelUtils.java new file mode 100644 index 0000000..3cd81c8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/NodeLabelUtils.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; + +public class NodeLabelUtils { + private static final String PARSE_FAILED_MSG = + "Failed to parse node-> labels json"; + private static final String LABELS_KEY = + "labels"; + + /** + * Get node to labels from JSON like: + * + * { + * "host1": { + * "labels": [ + * "x", + * "y", + * "z" + * ] + * }, + * "host2": { + * "labels": [ + * "a", + * "b", + * "c" + * ] + * }, + * "host3": { + * "labels": [] + * } + * } + * + * @param json + * @return node to labels map + */ + public static Map> getNodeToLabelsFromJson(String json) + throws IOException { + Map> nodeToLabels = new HashMap>(); + + if (json == null || json.trim().isEmpty()) { + return nodeToLabels; + } + + JsonParser parser = new JsonParser(); + JsonElement node; + try { + node = parser.parse(json); + } catch (JsonParseException e) { + throw new IOException(e); + } + + if (node.isJsonObject()) { + JsonObject obj = node.getAsJsonObject(); + for (Map.Entry entry : obj.entrySet()) { + String nodeName = entry.getKey().trim(); + if (nodeName.isEmpty()) { + throw new IOException(PARSE_FAILED_MSG); + } + nodeToLabels.put(nodeName, new HashSet()); + + if (entry.getValue().isJsonObject()) { + JsonObject labelObj = entry.getValue().getAsJsonObject(); + if (labelObj.entrySet().size() > 0) { + JsonElement labelsElement = labelObj.get(LABELS_KEY); + if (labelsElement == null || !labelsElement.isJsonArray()) { + throw new IOException(PARSE_FAILED_MSG); + } + JsonArray labelsArray = labelsElement.getAsJsonArray(); + for (JsonElement item : labelsArray) { + nodeToLabels.get(nodeName).add(item.getAsString()); + } + } + } else { + throw new IOException(PARSE_FAILED_MSG); + } + } + } else { + throw new IOException(PARSE_FAILED_MSG); + } + + return nodeToLabels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/AddLabelsEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/AddLabelsEvent.java new file mode 100644 index 0000000..744c2ce --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/AddLabelsEvent.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label.event; + +import java.util.Set; + +public class AddLabelsEvent extends NodeLabelManagerEvent { + private Set partitions; + + public AddLabelsEvent(Set partitions) { + super(NodeLabelManagerEventType.ADD_LABELS); + this.partitions = partitions; + } + + public Set getLabels() { + return partitions; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/NodeLabelManagerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/NodeLabelManagerEvent.java new file mode 100644 index 0000000..8ab76ea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/NodeLabelManagerEvent.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class NodeLabelManagerEvent extends + AbstractEvent { + public NodeLabelManagerEvent(NodeLabelManagerEventType type) { + super(type); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/NodeLabelManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/NodeLabelManagerEventType.java new file mode 100644 index 0000000..91fd47d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/NodeLabelManagerEventType.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label.event; + +public enum NodeLabelManagerEventType { + REMOVE_LABELS, + ADD_LABELS, + STORE_NODE_TO_LABELS +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/RemoveLabelsEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/RemoveLabelsEvent.java new file mode 100644 index 0000000..5a029e0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/RemoveLabelsEvent.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label.event; + +import java.util.Collection; + +public class RemoveLabelsEvent extends NodeLabelManagerEvent { + private Collection labels; + + public RemoveLabelsEvent(Collection labels) { + super(NodeLabelManagerEventType.REMOVE_LABELS); + this.labels = labels; + } + + public Collection getLabels() { + return labels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/StoreNodeToLabelsEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/StoreNodeToLabelsEvent.java new file mode 100644 index 0000000..5f2573f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/label/event/StoreNodeToLabelsEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label.event; + +import java.util.Map; +import java.util.Set; + +public class StoreNodeToLabelsEvent extends NodeLabelManagerEvent { + private Map> nodeToLabels; + + public StoreNodeToLabelsEvent(Map> nodeToLabels) { + super(NodeLabelManagerEventType.STORE_NODE_TO_LABELS); + this.nodeToLabels = nodeToLabels; + } + + public Map> getNodeToLabels() { + return nodeToLabels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/AddLabelsRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/AddLabelsRequestPBImpl.java new file mode 100644 index 0000000..26dbe7a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/AddLabelsRequestPBImpl.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddLabelsRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddLabelsRequest; + +public class AddLabelsRequestPBImpl extends AddLabelsRequest { + Set labels; + AddLabelsRequestProto proto = AddLabelsRequestProto + .getDefaultInstance(); + AddLabelsRequestProto.Builder builder = null; + boolean viaProto = false; + + public AddLabelsRequestPBImpl() { + this.builder = AddLabelsRequestProto.newBuilder(); + } + + public AddLabelsRequestPBImpl(AddLabelsRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = AddLabelsRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.labels != null && !this.labels.isEmpty()) { + builder.addAllLabels(this.labels); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + public AddLabelsRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void initLabels() { + if (this.labels != null) { + return; + } + AddLabelsRequestProtoOrBuilder p = viaProto ? proto : builder; + this.labels = new HashSet(); + this.labels.addAll(p.getLabelsList()); + } + + @Override + public void setLabels(Set labels) { + maybeInitBuilder(); + if (labels == null || labels.isEmpty()) { + builder.clearLabels(); + } + this.labels = labels; + } + + @Override + public Set getLabels() { + initLabels(); + return this.labels; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/AddLabelsResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/AddLabelsResponsePBImpl.java new file mode 100644 index 0000000..74aa930 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/AddLabelsResponsePBImpl.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddLabelsResponseProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddLabelsResponse; + +import com.google.protobuf.TextFormat; + +public class AddLabelsResponsePBImpl extends AddLabelsResponse { + + AddLabelsResponseProto proto = AddLabelsResponseProto + .getDefaultInstance(); + AddLabelsResponseProto.Builder builder = null; + boolean viaProto = false; + + public AddLabelsResponsePBImpl() { + builder = AddLabelsResponseProto.newBuilder(); + } + + public AddLabelsResponsePBImpl(AddLabelsResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public AddLabelsResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoveLabelsRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoveLabelsRequestPBImpl.java new file mode 100644 index 0000000..b86d2cb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoveLabelsRequestPBImpl.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveLabelsRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveLabelsRequest; + +public class RemoveLabelsRequestPBImpl extends RemoveLabelsRequest { + Set labels; + RemoveLabelsRequestProto proto = RemoveLabelsRequestProto + .getDefaultInstance(); + RemoveLabelsRequestProto.Builder builder = null; + boolean viaProto = false; + + public RemoveLabelsRequestPBImpl() { + this.builder = RemoveLabelsRequestProto.newBuilder(); + } + + public RemoveLabelsRequestPBImpl(RemoveLabelsRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RemoveLabelsRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.labels != null && !this.labels.isEmpty()) { + builder.addAllLabels(this.labels); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + public RemoveLabelsRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void initLabels() { + if (this.labels != null) { + return; + } + RemoveLabelsRequestProtoOrBuilder p = viaProto ? proto : builder; + this.labels = new HashSet(); + this.labels.addAll(p.getLabelsList()); + } + + @Override + public void setLabels(Set partitions) { + maybeInitBuilder(); + if (partitions == null || partitions.isEmpty()) { + builder.clearLabels(); + } + this.labels = partitions; + } + + @Override + public Set getLabels() { + initLabels(); + return this.labels; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoveLabelsResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoveLabelsResponsePBImpl.java new file mode 100644 index 0000000..935d8f0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoveLabelsResponsePBImpl.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveLabelsResponseProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveLabelsResponse; + +import com.google.protobuf.TextFormat; + +public class RemoveLabelsResponsePBImpl extends RemoveLabelsResponse { + + RemoveLabelsResponseProto proto = RemoveLabelsResponseProto + .getDefaultInstance(); + RemoveLabelsResponseProto.Builder builder = null; + boolean viaProto = false; + + public RemoveLabelsResponsePBImpl() { + builder = RemoveLabelsResponseProto.newBuilder(); + } + + public RemoveLabelsResponsePBImpl(RemoveLabelsResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public RemoveLabelsResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SetNodeToLabelsRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SetNodeToLabelsRequestPBImpl.java new file mode 100644 index 0000000..50a8ccb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SetNodeToLabelsRequestPBImpl.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.NodeToLabelsProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SetNodeToLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SetNodeToLabelsRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.SetNodeToLabelsRequest; + +import com.google.common.collect.Sets; + +public class SetNodeToLabelsRequestPBImpl extends + SetNodeToLabelsRequest { + SetNodeToLabelsRequestProto proto = SetNodeToLabelsRequestProto + .getDefaultInstance(); + SetNodeToLabelsRequestProto.Builder builder = null; + boolean viaProto = false; + + private Map> nodeToLabels; + + public SetNodeToLabelsRequestPBImpl() { + this.builder = SetNodeToLabelsRequestProto.newBuilder(); + } + + public SetNodeToLabelsRequestPBImpl(SetNodeToLabelsRequestProto proto) { + this.proto = proto; + this.viaProto = true; + } + + private void initNodeToLabels() { + if (this.nodeToLabels != null) { + return; + } + SetNodeToLabelsRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getNodeToLabelsList(); + this.nodeToLabels = new HashMap>(); + + for (NodeToLabelsProto c : list) { + this.nodeToLabels + .put(c.getNode(), Sets.newHashSet(c.getLabelsList())); + } + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SetNodeToLabelsRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void addNodeToLabelsToProto() { + maybeInitBuilder(); + builder.clearNodeToLabels(); + if (nodeToLabels == null) { + return; + } + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator>> iter = nodeToLabels.entrySet() + .iterator(); + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public NodeToLabelsProto next() { + Entry> now = iter.next(); + return NodeToLabelsProto.newBuilder().setNode(now.getKey()) + .addAllLabels(now.getValue()).build(); + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + }; + } + }; + builder.addAllNodeToLabels(iterable); + } + + private void mergeLocalToBuilder() { + if (this.nodeToLabels != null) { + addNodeToLabelsToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + public SetNodeToLabelsRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public Map> getNodeToLabels() { + initNodeToLabels(); + return this.nodeToLabels; + } + + @Override + public void setNodeToLabels(Map> map) { + initNodeToLabels(); + nodeToLabels.clear(); + nodeToLabels.putAll(map); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SetNodeToLabelsResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SetNodeToLabelsResponsePBImpl.java new file mode 100644 index 0000000..053004d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SetNodeToLabelsResponsePBImpl.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SetNodeToLabelsResponseProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.SetNodeToLabelsResponse; + +import com.google.protobuf.TextFormat; + +public class SetNodeToLabelsResponsePBImpl extends SetNodeToLabelsResponse { + + SetNodeToLabelsResponseProto proto = SetNodeToLabelsResponseProto + .getDefaultInstance(); + SetNodeToLabelsResponseProto.Builder builder = null; + boolean viaProto = false; + + public SetNodeToLabelsResponsePBImpl() { + builder = SetNodeToLabelsResponseProto.newBuilder(); + } + + public SetNodeToLabelsResponsePBImpl(SetNodeToLabelsResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public SetNodeToLabelsResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/NodeLabelTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/NodeLabelTestBase.java new file mode 100644 index 0000000..218eff4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/NodeLabelTestBase.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.junit.Assert; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; + +public class NodeLabelTestBase { + public static void assertMapEquals(Map> m1, + ImmutableMap> m2) { + Assert.assertEquals(m1.size(), m2.size()); + for (String k : m1.keySet()) { + Assert.assertTrue(m2.containsKey(k)); + assertCollectionEquals(m1.get(k), m2.get(k)); + } + } + + public static void assertMapContains(Map> m1, + ImmutableMap> m2) { + for (String k : m2.keySet()) { + Assert.assertTrue(m1.containsKey(k)); + assertCollectionEquals(m1.get(k), m2.get(k)); + } + } + + public static void assertCollectionEquals(Collection c1, + Collection c2) { + Assert.assertEquals(c1.size(), c2.size()); + Iterator i1 = c1.iterator(); + Iterator i2 = c2.iterator(); + while (i1.hasNext()) { + Assert.assertEquals(i1.next(), i2.next()); + } + } + + public static Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/SyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/SyncDispatcher.java new file mode 100644 index 0000000..25c0e1a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/SyncDispatcher.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label; + +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; + +public class SyncDispatcher implements Dispatcher { + @SuppressWarnings("rawtypes") + EventHandler handler = null; + + @SuppressWarnings("rawtypes") + @Override + public EventHandler getEventHandler() { + return handler; + } + + @SuppressWarnings("rawtypes") + @Override + public void register(Class eventType, EventHandler handler) { + this.handler = handler; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/TestFileSystemNodeLabelManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/TestFileSystemNodeLabelManager.java new file mode 100644 index 0000000..cc00617 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/TestFileSystemNodeLabelManager.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; + +public class TestFileSystemNodeLabelManager extends NodeLabelTestBase { + MockFileSystemNodeLabelManager mgr = null; + Configuration conf = null; + + private static class MockFileSystemNodeLabelManager extends + FileSystemNodeLabelManager { + @Override + protected void initDispatcher(Configuration conf) { + super.dispatcher = new SyncDispatcher(); + } + + @Override + protected void startDispatcher() { + // do nothing + } + } + + @Before + public void before() throws IOException { + mgr = new MockFileSystemNodeLabelManager(); + conf = new Configuration(); + File tempDir = File.createTempFile("nlb", ".tmp"); + tempDir.delete(); + tempDir.mkdirs(); + tempDir.deleteOnExit(); + conf.set(YarnConfiguration.FS_NODE_LABEL_STORE_URI, + tempDir.getAbsolutePath()); + mgr.init(conf); + mgr.start(); + } + + @After + public void after() throws IOException { + mgr.fs.delete(mgr.rootDirPath, true); + mgr.stop(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(timeout = 10000) + public void testRecoverWithMirror() throws Exception { + mgr.addLabels(toSet("p1", "p2", "p3")); + mgr.addLabel("p4"); + mgr.addLabels(toSet("p5", "p6")); + mgr.setLabelsOnMultipleNodes((Map) ImmutableMap.of("n1", toSet("p1"), "n2", + toSet("p2"))); + mgr.setLabelsOnMultipleNodes((Map) ImmutableMap.of("n3", toSet("p3"), "n4", + toSet("p4"), "n5", toSet("p5"), "n6", toSet("p6"), "n7", toSet("p6"))); + + /* + * node -> partition p1: n1 p2: n2 p3: n3 p4: n4 p5: n5 p6: n6, n7 + */ + + mgr.removeLabel("p1"); + mgr.removeLabels(Arrays.asList("p3", "p5")); + + /* + * After removed p2: n2 p4: n4 p6: n6, n7 + */ + // shutdown mgr and start a new mgr + mgr.stop(); + + mgr = new MockFileSystemNodeLabelManager(); + mgr.init(conf); + + // check variables + Assert.assertEquals(3, mgr.getLabels().size()); + Assert.assertTrue(mgr.getLabels().containsAll( + Arrays.asList("p2", "p4", "p6"))); + + assertMapContains(mgr.getNodesToLabels(), ImmutableMap.of("n2", + toSet("p2"), "n4", toSet("p4"), "n6", toSet("p6"), "n7", toSet("p6"))); + + // stutdown mgr and start a new mgr + mgr.stop(); + mgr = new MockFileSystemNodeLabelManager(); + mgr.init(conf); + + // check variables + Assert.assertEquals(3, mgr.getLabels().size()); + Assert.assertTrue(mgr.getLabels().containsAll( + Arrays.asList("p2", "p4", "p6"))); + + assertMapContains(mgr.getNodesToLabels(), ImmutableMap.of("n2", + toSet("p2"), "n4", toSet("p4"), "n6", toSet("p6"), "n7", toSet("p6"))); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(timeout = 10000) + public void testEditlogRecover() throws Exception { + mgr.addLabels(toSet("p1", "p2", "p3")); + mgr.addLabel("p4"); + mgr.addLabels(toSet("p5", "p6")); + mgr.setLabelsOnMultipleNodes((Map) ImmutableMap.of("n1", toSet("p1"), "n2", + toSet("p2"))); + mgr.setLabelsOnMultipleNodes((Map) ImmutableMap.of("n3", toSet("p3"), "n4", + toSet("p4"), "n5", toSet("p5"), "n6", toSet("p6"), "n7", toSet("p6"))); + + /* + * node -> partition p1: n1 p2: n2 p3: n3 p4: n4 p5: n5 p6: n6, n7 + */ + + mgr.removeLabel("p1"); + mgr.removeLabels(Arrays.asList("p3", "p5")); + + /* + * After removed p2: n2 p4: n4 p6: n6, n7 + */ + // shutdown mgr and start a new mgr + mgr.stop(); + + mgr = new MockFileSystemNodeLabelManager(); + mgr.init(conf); + + // check variables + Assert.assertEquals(3, mgr.getLabels().size()); + Assert.assertTrue(mgr.getLabels().containsAll( + Arrays.asList("p2", "p4", "p6"))); + + assertMapContains(mgr.getNodesToLabels(), ImmutableMap.of( + "n2", toSet("p2"), "n4", toSet("p4"), "n6", toSet("p6"), "n7", + toSet("p6"))); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/TestNodeLabelManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/TestNodeLabelManager.java new file mode 100644 index 0000000..38ad79c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/TestNodeLabelManager.java @@ -0,0 +1,601 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; + +public class TestNodeLabelManager extends NodeLabelTestBase { + private final Resource EMPTY_RESOURCE = Resource.newInstance(0, 0); + private final Resource SMALL_NODE = Resource.newInstance(100, 0); + private final Resource LARGE_NODE = Resource.newInstance(1000, 0); + + DumbNodeLabelManager mgr = null; + + private static class DumbNodeLabelManager extends NodeLabelManager { + Map> lastNodeToLabels = null; + Collection lastAddedlabels = null; + Collection lastRemovedlabels = null; + + @Override + public void persistRemovingLabels(Collection labels) + throws IOException { + lastRemovedlabels = labels; + } + + @Override + public void recover() throws IOException { + // do nothing here + } + + @Override + protected void initDispatcher(Configuration conf) { + super.dispatcher = new SyncDispatcher(); + } + + @Override + protected void startDispatcher() { + // do nothing + } + + @Override + public void persistNodeToLabelsChanges( + Map> nodeToLabels) throws IOException { + this.lastNodeToLabels = nodeToLabels; + } + + @Override + public void persistAddingLabels(Set labels) throws IOException { + this.lastAddedlabels = labels; + } + } + + @Before + public void before() { + mgr = new DumbNodeLabelManager(); + mgr.init(new Configuration()); + mgr.start(); + } + + @After + public void after() { + mgr.stop(); + } + + @Test(timeout = 5000) + public void testAddRemovelabel() throws Exception { + // Add some label + mgr.addLabel("hello"); + assertCollectionEquals(mgr.lastAddedlabels, Arrays.asList("hello")); + + mgr.addLabel("world"); + mgr.addLabels(toSet("hello1", "world1")); + assertCollectionEquals(mgr.lastAddedlabels, + Sets.newHashSet("hello1", "world1")); + + Assert.assertTrue(mgr.getLabels().containsAll( + Sets.newHashSet("hello", "world", "hello1", "world1"))); + + // try to remove null, empty and non-existed label, should fail + for (String p : Arrays.asList(null, NodeLabelManager.NO_LABEL, "xx")) { + boolean caught = false; + try { + mgr.removeLabel(p); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("remove label should fail " + + "when label is null/empty/non-existed", caught); + } + + // Remove some label + mgr.removeLabel("hello"); + assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("hello")); + Assert.assertTrue(mgr.getLabels().containsAll( + Arrays.asList("world", "hello1", "world1"))); + + mgr.removeLabels(Arrays.asList("hello1", "world1", "world")); + Assert.assertTrue(mgr.lastRemovedlabels.containsAll(Sets.newHashSet( + "hello1", "world1", "world"))); + Assert.assertTrue(mgr.getLabels().isEmpty()); + } + + @Test(timeout = 5000) + public void testAddRemovelabelIgnoreCase() throws Exception { + // Add some label + mgr.addLabel("HeLlO"); + + assertCollectionEquals(mgr.lastAddedlabels, Arrays.asList("hello")); + Assert.assertFalse(mgr.getLabels().containsAll(Arrays.asList("HeLlO"))); + Assert.assertTrue(mgr.getLabels().containsAll(Arrays.asList("hello"))); + } + + @Test(timeout = 5000) + public void testAddInvalidlabel() throws IOException { + boolean caught = false; + try { + mgr.addLabel(null); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("null label should not add to repo", caught); + + caught = false; + try { + mgr.addLabel(NodeLabelManager.NO_LABEL); + } catch (IOException e) { + caught = true; + } + + Assert.assertTrue("empty label should not add to repo", caught); + + caught = false; + try { + mgr.addLabel("-?"); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("invalid label charactor should not add to repo", caught); + + caught = false; + try { + mgr.addLabel(StringUtils.repeat("c", 257)); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("too long label should not add to repo", caught); + + caught = false; + try { + mgr.addLabel("-aaabbb"); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("label cannot start with \"-\"", caught); + + caught = false; + try { + mgr.addLabel("_aaabbb"); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("label cannot start with \"_\"", caught); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(timeout = 5000) + public void testSetRemoveLabelsOnNodes() throws Exception { + // set a label on a node, but label doesn't exist + boolean caught = false; + try { + mgr.setLabelsOnSingleNode("node", toSet("label")); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("trying to set a label to a node but " + + "label doesn't exist in repository should fail", caught); + + // set a label on a node, but node is null or empty + try { + mgr.setLabelsOnSingleNode(NodeLabelManager.NO_LABEL, toSet("label")); + } catch (IOException e) { + caught = true; + } + Assert.assertTrue("trying to add a empty node but succeeded", caught); + + // set node->label one by one + mgr.addLabels(toSet("p1", "p2", "p3")); + mgr.setLabelsOnSingleNode("n1", toSet("p1")); + mgr.setLabelsOnSingleNode("n1", toSet("p2")); + mgr.setLabelsOnSingleNode("n2", toSet("p3")); + assertMapEquals(mgr.getNodesToLabels(), + ImmutableMap.of("n1", toSet("p2"), "n2", toSet("p3"))); + assertMapEquals(mgr.lastNodeToLabels, ImmutableMap.of("n2", toSet("p3"))); + + // set bunch of node->label + mgr.setLabelsOnMultipleNodes((Map) ImmutableMap.of("n3", toSet("p3"), "n1", + toSet("p1"))); + assertMapEquals(mgr.getNodesToLabels(), ImmutableMap.of("n1", toSet("p1"), + "n2", toSet("p3"), "n3", toSet("p3"))); + assertMapEquals(mgr.lastNodeToLabels, + ImmutableMap.of("n3", toSet("p3"), "n1", toSet("p1"))); + + // remove label on node + mgr.removeLabelOnNode("n1"); + assertMapEquals(mgr.getNodesToLabels(), + ImmutableMap.of("n1", NodeLabelManager.EMPTY_STRING_SET, "n2", + toSet("p3"), "n3", toSet("p3"))); + assertMapEquals(mgr.lastNodeToLabels, + ImmutableMap.of("n1", NodeLabelManager.EMPTY_STRING_SET)); + + // remove labels on node + mgr.removeLabelsOnNodes(Arrays.asList("n2", "n3")); + assertMapEquals(mgr.nodeToLabels, ImmutableMap.of("n1", + NodeLabelManager.EMPTY_STRING_SET, "n2", + NodeLabelManager.EMPTY_STRING_SET, "n3", + NodeLabelManager.EMPTY_STRING_SET)); + assertMapEquals(mgr.lastNodeToLabels, ImmutableMap.of("n2", + NodeLabelManager.EMPTY_STRING_SET, "n3", + NodeLabelManager.EMPTY_STRING_SET)); + } + + @Test(timeout = 5000) + public void testRemovelabelWithNodes() throws Exception { + mgr.addLabels(toSet("p1", "p2", "p3")); + mgr.setLabelsOnSingleNode("n1", toSet("p1")); + mgr.setLabelsOnSingleNode("n2", toSet("p2")); + mgr.setLabelsOnSingleNode("n3", toSet("p3")); + + mgr.removeLabel("p1"); + assertMapEquals(mgr.getNodesToLabels(), + ImmutableMap.of("n2", toSet("p2"), "n3", toSet("p3"))); + assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p1")); + + mgr.removeLabels(Arrays.asList("p2", "p3")); + Assert.assertTrue(mgr.getNodesToLabels().isEmpty()); + Assert.assertTrue(mgr.getLabels().isEmpty()); + assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p2", "p3")); + } + + @Test(timeout = 5000) + public void testNodeActiveDeactiveUpdate() throws Exception { + mgr.addLabels(toSet("p1", "p2", "p3")); + mgr.setLabelsOnSingleNode("n1", toSet("p1")); + mgr.setLabelsOnSingleNode("n2", toSet("p2")); + mgr.setLabelsOnSingleNode("n3", toSet("p3")); + + Assert.assertEquals(mgr.getResourceWithLabel("p1"), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceWithLabel("p2"), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceWithLabel("p3"), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceWithLabel(NodeLabelManager.NO_LABEL), + EMPTY_RESOURCE); + + // active two NM to n1, one large and one small + mgr.activeNode(NodeId.newInstance("n1", 0), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("n1", 1), LARGE_NODE); + Assert.assertEquals(mgr.getResourceWithLabel("p1"), + Resources.add(SMALL_NODE, LARGE_NODE)); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p1"), 1); + + // change the large NM to small, check if resource updated + mgr.updateNodeResource(NodeId.newInstance("n1", 1), SMALL_NODE); + Assert.assertEquals(mgr.getResourceWithLabel("p1"), + Resources.multiply(SMALL_NODE, 2)); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p1"), 1); + + // deactive one NM, and check if resource updated + mgr.deactiveNode(NodeId.newInstance("n1", 1)); + Assert.assertEquals(mgr.getResourceWithLabel("p1"), SMALL_NODE); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p1"), 1); + + // continus deactive, check if resource updated + mgr.deactiveNode(NodeId.newInstance("n1", 0)); + Assert.assertEquals(mgr.getResourceWithLabel("p1"), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p1"), 0); + + // Add two NM to n1 back + mgr.activeNode(NodeId.newInstance("n1", 0), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("n1", 1), LARGE_NODE); + + // And remove p1, now the two NM should come to default label, + mgr.removeLabel("p1"); + Assert.assertEquals(mgr.getResourceWithLabel(NodeLabelManager.NO_LABEL), + Resources.add(SMALL_NODE, LARGE_NODE)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(timeout = 5000) + public void testUpdateNodeLabelWithActiveNode() throws Exception { + mgr.addLabels(toSet("p1", "p2", "p3")); + mgr.setLabelsOnSingleNode("n1", toSet("p1")); + mgr.setLabelsOnSingleNode("n2", toSet("p2")); + mgr.setLabelsOnSingleNode("n3", toSet("p3")); + + // active two NM to n1, one large and one small + mgr.activeNode(NodeId.newInstance("n1", 0), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("n2", 0), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("n3", 0), SMALL_NODE); + + // change label of n1 to p2 + mgr.setLabelsOnSingleNode("n1", toSet("p2")); + Assert.assertEquals(mgr.getResourceWithLabel("p1"), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p1"), 0); + Assert.assertEquals(mgr.getResourceWithLabel("p2"), + Resources.multiply(SMALL_NODE, 2)); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p2"), 2); + Assert.assertEquals(mgr.getResourceWithLabel("p3"), SMALL_NODE); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p3"), 1); + + // add more labels + mgr.addLabels(toSet("p4", "p5", "p6")); + mgr.setLabelsOnMultipleNodes((Map) ImmutableMap.of("n4", toSet("p1"), "n5", + toSet("p2"), "n6", toSet("p3"), "n7", toSet("p4"), "n8", toSet("p5"))); + + // now node -> label is, + // p1 : n4 + // p2 : n1, n2, n5 + // p3 : n3, n6 + // p4 : n7 + // p5 : n8 + // no-label : n9 + + // active these nodes + mgr.activeNode(NodeId.newInstance("n4", 0), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("n5", 0), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("n6", 0), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("n7", 0), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("n8", 0), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("n9", 0), SMALL_NODE); + + // check varibles + Assert.assertEquals(mgr.getResourceWithLabel("p1"), SMALL_NODE); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p1"), 1); + Assert.assertEquals(mgr.getResourceWithLabel("p2"), + Resources.multiply(SMALL_NODE, 3)); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p2"), 3); + Assert.assertEquals(mgr.getResourceWithLabel("p3"), + Resources.multiply(SMALL_NODE, 2)); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p3"), 2); + Assert.assertEquals(mgr.getResourceWithLabel("p4"), + Resources.multiply(SMALL_NODE, 1)); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p4"), 1); + Assert.assertEquals(mgr.getResourceWithLabel("p5"), + Resources.multiply(SMALL_NODE, 1)); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p5"), 1); + Assert.assertEquals(mgr.getResourceWithLabel(""), + Resources.multiply(SMALL_NODE, 1)); + Assert.assertEquals(mgr.getNumOfNodesByLabel(""), 1); + + // change a bunch of nodes -> labels + // n4 -> p2 + // n7 -> empty + // n5 -> p1 + // n8 -> empty + // n9 -> p1 + // + // now become: + // p1 : n5, n9 + // p2 : n1, n2, n4 + // p3 : n3, n6 + // p4 : [ ] + // p5 : [ ] + // no label: n8, n7 + mgr.setLabelsOnMultipleNodes((Map) ImmutableMap.of("n4", toSet("p2"), "n7", + NodeLabelManager.EMPTY_STRING_SET, "n5", toSet("p1"), "n8", + NodeLabelManager.EMPTY_STRING_SET, "n9", toSet("p1"))); + + // check varibles + Assert.assertEquals(mgr.getResourceWithLabel("p1"), + Resources.multiply(SMALL_NODE, 2)); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p1"), 2); + Assert.assertEquals(mgr.getResourceWithLabel("p2"), + Resources.multiply(SMALL_NODE, 3)); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p2"), 3); + Assert.assertEquals(mgr.getResourceWithLabel("p3"), + Resources.multiply(SMALL_NODE, 2)); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p3"), 2); + Assert.assertEquals(mgr.getResourceWithLabel("p4"), + Resources.multiply(SMALL_NODE, 0)); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p4"), 0); + Assert.assertEquals(mgr.getResourceWithLabel("p5"), + Resources.multiply(SMALL_NODE, 0)); + Assert.assertEquals(mgr.getNumOfNodesByLabel("p5"), 0); + Assert.assertEquals(mgr.getResourceWithLabel(""), + Resources.multiply(SMALL_NODE, 2)); + Assert.assertEquals(mgr.getNumOfNodesByLabel(""), 2); + } + + @Test + public void testGetQueueResource() throws Exception { + Resource clusterResource = Resource.newInstance(9999, 1); + + /* + * Node->Labels: + * host1 : red, blue + * host2 : blue, yellow + * host3 : yellow + * host4 : + */ + mgr.addLabels(toSet("red", "blue", "yellow")); + mgr.setLabelsOnSingleNode("host1", toSet("red", "blue")); + mgr.setLabelsOnSingleNode("host2", toSet("blue", "yellow")); + mgr.setLabelsOnSingleNode("host3", toSet("yellow")); + + // active two NM to n1, one large and one small + mgr.activeNode(NodeId.newInstance("host1", 0), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("host2", 0), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("host3", 0), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("host4", 0), SMALL_NODE); + + // reinitialize queue + Set q1Label = toSet("red", "blue"); + Set q2Label = toSet("blue", "yellow"); + Set q3Label = toSet("yellow"); + Set q4Label = NodeLabelManager.EMPTY_STRING_SET; + Set q5Label = toSet(NodeLabelManager.ANY); + + Map> queueToLabels = new HashMap>(); + queueToLabels.put("Q1", q1Label); + queueToLabels.put("Q2", q2Label); + queueToLabels.put("Q3", q3Label); + queueToLabels.put("Q4", q4Label); + queueToLabels.put("Q5", q5Label); + + mgr.reinitializeQueueLabels(queueToLabels); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_NODE, 3), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 4), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 1), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + /* + * Check resource after changes some labels + * Node->Labels: + * host1 : blue + * host2 : + * host3 : red, yellow + * host4 : + */ + mgr.setLabelsOnMultipleNodes(ImmutableMap.of( + "host3", toSet("red", "yellow"), + "host1", toSet("blue"), + "host2", NodeLabelManager.EMPTY_STRING_SET)); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_NODE, 4), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 4), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 2), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + /* + * Check resource after deactive/active some nodes + * Node->Labels: + * (deactived) host1 : blue + * host2 : + * (deactived and then actived) host3 : red, yellow + * host4 : + */ + mgr.deactiveNode(NodeId.newInstance("host1", 0)); + mgr.deactiveNode(NodeId.newInstance("host3", 0)); + mgr.activeNode(NodeId.newInstance("host3", 0), SMALL_NODE); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_NODE, 3), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 3), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 2), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + /* + * Check resource after refresh queue: + * Q1: blue + * Q2: red, blue + * Q3: red + * Q4: + * Q5: ANY + */ + q1Label = toSet("blue"); + q2Label = toSet("blue", "red"); + q3Label = toSet("red"); + q4Label = NodeLabelManager.EMPTY_STRING_SET; + q5Label = toSet(NodeLabelManager.ANY); + + queueToLabels.clear(); + queueToLabels.put("Q1", q1Label); + queueToLabels.put("Q2", q2Label); + queueToLabels.put("Q3", q3Label); + queueToLabels.put("Q4", q4Label); + queueToLabels.put("Q5", q5Label); + + mgr.reinitializeQueueLabels(queueToLabels); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_NODE, 2), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 3), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 2), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + /* + * Active NMs in nodes already have NM + * Node->Labels: + * host2 : + * host3 : red, yellow (3 NMs) + * host4 : (2 NMs) + */ + mgr.activeNode(NodeId.newInstance("host3", 1), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("host3", 2), SMALL_NODE); + mgr.activeNode(NodeId.newInstance("host4", 1), SMALL_NODE); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_NODE, 3), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 6), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 6), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 3), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + /* + * Deactive NMs in nodes already have NMs + * Node->Labels: + * host2 : + * host3 : red, yellow (2 NMs) + * host4 : (0 NMs) + */ + mgr.deactiveNode(NodeId.newInstance("host3", 2)); + mgr.deactiveNode(NodeId.newInstance("host4", 1)); + mgr.deactiveNode(NodeId.newInstance("host4", 0)); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_NODE, 1), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 3), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_NODE, 1), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/TestNodeLabelUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/TestNodeLabelUtils.java new file mode 100644 index 0000000..323cdca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/label/TestNodeLabelUtils.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.label; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.label.NodeLabelUtils; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; + +public class TestNodeLabelUtils extends NodeLabelTestBase { + private void assertParseShouldFail(String json, boolean shouldFail) { + try { + NodeLabelUtils.getNodeToLabelsFromJson(json); + if (shouldFail) { + Assert.fail("should fail:" + json == null ? "" : json); + } + } catch (IOException e) { + if (!shouldFail) { + Assert.fail("shouldn't fail:" + json == null ? "" : json); + } + } + } + + private void assertParseFailed(String json) { + assertParseShouldFail(json, true); + } + + @Test + public void testParseNodeToLabelsFromJson() throws IOException { + // empty and null + assertParseShouldFail(null, false); + assertParseShouldFail("", false); + + // empty host + String json = + "{\"host1\":{\"labels\":[\"x\",\"y\"]}, \"\":{\"labels\":[\"x\",\"y\"]}}"; + assertParseFailed(json); + + // not json object + json = + "[\"host1\":{\"labels\":[\"x\",\"y\"]}, \"\":{\"labels\":[\"x\",\"y\"]}]"; + assertParseFailed(json); + + // don't have labels + json = + "[\"host1\":{\"labels\":[\"x\",\"y\"]}, \"\":{\"tag\":[\"x\",\"y\"]}]"; + assertParseFailed(json); + + // labels is not array + json = "{\"host1\":{\"labels\":{\"x\":\"y\"}}}"; + assertParseFailed(json); + + // not a valid json + json = "[ }"; + assertParseFailed(json); + + // normal case #1 + json = + "{\"host1\":{\"labels\":[\"x\",\"y\"]}, \"host2\":{\"labels\":[\"x\",\"y\"]}}"; + Map> nodeToLabels = + NodeLabelUtils.getNodeToLabelsFromJson(json); + assertMapEquals(nodeToLabels, + ImmutableMap.of("host1", toSet("x", "y"), "host2", toSet("x", "y"))); + + // normal case #2 + json = + "{\"host1\":{\"labels\":[\"x\",\"y\"]}, \"host2\":{\"labels\":[\"a\",\"b\"]}}"; + nodeToLabels = NodeLabelUtils.getNodeToLabelsFromJson(json); + assertMapEquals(nodeToLabels, + ImmutableMap.of("host1", toSet("x", "y"), "host2", toSet("a", "b"))); + + // label is empty #1 + json = "{\"host1\":{\"labels\":[\"x\",\"y\"]}, \"host2\":{\"labels\":[]}}"; + nodeToLabels = NodeLabelUtils.getNodeToLabelsFromJson(json); + assertMapEquals(nodeToLabels, ImmutableMap.of("host1", toSet("x", "y"), + "host2", new HashSet())); + + // label is empty #2 + json = "{\"host1\":{\"labels\":[\"x\",\"y\"]}, \"host2\":{}}"; + nodeToLabels = NodeLabelUtils.getNodeToLabelsFromJson(json); + assertMapEquals(nodeToLabels, ImmutableMap.of("host1", toSet("x", "y"), + "host2", new HashSet())); + } +}