diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
index 394454f..7d2d9b9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java
@@ -57,6 +57,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
@@ -589,4 +591,20 @@ SignalContainerResponse signalToContainer(
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
UpdateApplicationTimeoutsRequest request)
throws YarnException, IOException;
+
+ /**
+ *
+ * The interface for sending scheduler configuration mutation requests for
+ * changing configurations at runtime.
+ *
+ * @param request configuration mutation request
+ * @return an empty response that the update has completed successfully.
+ * @throws YarnException
+ * @throws IOException
+ */
+ @Public
+ @Unstable
+ @Idempotent
+ public SchedulerConfigurationMutationResponse mutateSchedulerConfiguration(
+ SchedulerConfigurationMutationRequest request) throws IOException, YarnException;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SchedulerConfigurationMutationRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SchedulerConfigurationMutationRequest.java
new file mode 100644
index 0000000..1d04d38
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SchedulerConfigurationMutationRequest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.records.SchedulerConfigurationMutation;
+import org.apache.hadoop.yarn.util.Records;
+
+
+/**
+ *
+ * The request sent by the client to the ResourceManager to add or
+ * remove or update scheduler configurations.
+ *
+ *
+ * The request includes the {@link SchedulerConfigurationMutation}
+ *
+ *
+ * @see ApplicationClientProtocol#mutateSchedulerConfiguration(SchedulerConfigurationMutationRequest)
+ */
+
+@Public
+@Unstable
+public abstract class SchedulerConfigurationMutationRequest {
+ public static SchedulerConfigurationMutationRequest newInstance(
+ SchedulerConfigurationMutation schedulerConfigurationMutation) {
+ SchedulerConfigurationMutationRequest request =
+ Records.newRecord(SchedulerConfigurationMutationRequest.class);
+ request.setSchedulerConfigurationMutation(schedulerConfigurationMutation);
+ return request;
+ }
+
+ /**
+ * Get the SchedulerConfigurationMutation.
+ *
+ * @return SchedulerConfigurationMutation
+ */
+ public abstract SchedulerConfigurationMutation getSchedulerConfigurationMutation();
+
+ /**
+ * Set the SchedulerConfigurationMutation.
+ *
+ * @param schedulerConfigurationMutation SchedulerConfigurationMutation
+ */
+ public abstract void setSchedulerConfigurationMutation(
+ SchedulerConfigurationMutation schedulerConfigurationMutation);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SchedulerConfigurationMutationResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SchedulerConfigurationMutationResponse.java
new file mode 100644
index 0000000..9fe3204
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SchedulerConfigurationMutationResponse.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+
+/**
+ *
+ * The response sent by the ResourceManager to the client on add or
+ * remove or update scheduler configurations.
+ *
+ *
+ * A response without exception means that the configuration mutation has completed
+ * successfully.
+ *
+ */
+@Public
+@Unstable
+public abstract class SchedulerConfigurationMutationResponse {
+
+ public static SchedulerConfigurationMutationResponse newInstance() {
+ SchedulerConfigurationMutationResponse response =
+ Records.newRecord(SchedulerConfigurationMutationResponse.class);
+ return response;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueConfigurationMutation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueConfigurationMutation.java
new file mode 100644
index 0000000..d7f657c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueConfigurationMutation.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.Map;
+
+/**
+ * QueueConfigurationMutation encapsulates mutations for a
+ * single queue's configuration.
+ *
+ * @see SchedulerConfigurationMutation
+ */
+@Public
+@Unstable
+public abstract class QueueConfigurationMutation {
+
+ @Private
+ @Unstable
+ public static QueueConfigurationMutation newInstance(String queueName,
+ SchedulerOperationType operationType, Map configuration) {
+ QueueConfigurationMutation mutation = Records.newRecord(QueueConfigurationMutation.class);
+ mutation.setQueueName(queueName);
+ mutation.setOperationType(operationType);
+ mutation.setConfigurationMutation(configuration);
+ return mutation;
+ }
+
+ /**
+ * Get the queue name of this queue configuration change
+ * @return queue name
+ */
+ @Private
+ @Unstable
+ public abstract String getQueueName();
+
+ /**
+ * Set the queue name of this queue configuration change
+ * @param queueName queue name to set
+ */
+ @Private
+ @Unstable
+ public abstract void setQueueName(String queueName);
+
+ /**
+ * Get the operation type of this queue configuration change
+ * @return SchedulerOperationType of this queue
+ * configuration change
+ */
+ @Private
+ @Unstable
+ public abstract SchedulerOperationType getOperationType();
+
+ /**
+ * Set the SchedulerOperationType of this
+ * queue configuration change
+ * @param operationType operation type of this queue configuration change
+ */
+ @Private
+ @Unstable
+ public abstract void setOperationType(SchedulerOperationType operationType);
+
+ /**
+ * Get the params for this queue configuration change
+ * @return Map of key-value pairs for this configuration change
+ */
+ @Private
+ @Unstable
+ public abstract Map getConfigurationMutation();
+
+ /**
+ * Set the params for this queue configuration change
+ * @param configuration map of key-values for this queue configuration change
+ */
+ @Private
+ @Unstable
+ public abstract void setConfigurationMutation(Map configuration);
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerConfigurationMutation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerConfigurationMutation.java
new file mode 100644
index 0000000..4df76c6
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerConfigurationMutation.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+
+/**
+ * SchedulerConfigurationMutation encapsulates mutations
+ * for scheduler configuration.
+ * @see ConfigurationMutationRequest
+ */
+@Public
+@Unstable
+public abstract class SchedulerConfigurationMutation {
+
+ @Private
+ @Unstable
+ public static SchedulerConfigurationMutation newInstance(
+ List mutations) {
+ SchedulerConfigurationMutation mutation =
+ Records.newRecord(SchedulerConfigurationMutation.class);
+ mutation.setQueueMutations(mutations);
+ return mutation;
+ }
+
+ /**
+ * Get the queue mutations of this scheduler configuration change.
+ * @return queue mutations
+ */
+ @Private
+ @Unstable
+ public abstract List getQueueMutations();
+
+ /**
+ * Set list of queue mutations for this scheduler configuration change.
+ * @param mutations list of queue mutations
+ */
+ @Private
+ @Unstable
+ public abstract void setQueueMutations(List mutations);
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerOperationType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerOperationType.java
new file mode 100644
index 0000000..7fe74b2
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerOperationType.java
@@ -0,0 +1,44 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Type of scheduler configuration update.
+ */
+@Public
+@Evolving
+public enum SchedulerOperationType {
+ /**
+ * Add queue and set its configurations.
+ */
+ ADD,
+
+ /**
+ * Remove queue.
+ */
+ REMOVE,
+
+ /**
+ * Update queue's configurations.
+ */
+ UPDATE,
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 7dd5ce3..4f878cc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -597,6 +597,17 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS =
"org.apache.hadoop.yarn.LocalConfigurationProvider";
+ public static final String USE_MUTABLE_QUEUE_CONFIG = YARN_PREFIX
+ + "scheduler.mutable-queue-config.enabled";
+
+ public static final String RM_CONF_BACKING_STORE_CLASS = RM_PREFIX
+ + "configuration.store.class";
+ public static final String RM_SCHEDULER_CONFIGURATION_PROVIDER_CLASS = RM_PREFIX
+ + "scheduler.provider-class";
+
+ public static final String RM_CONF_MUTATION_POLICY = RM_PREFIX
+ + "configuration.mutation.policy";
+
public static final String YARN_AUTHORIZATION_PROVIDER = YARN_PREFIX
+ "authorization-provider";
private static final List RM_SERVICES_ADDRESS_CONF_KEYS_HTTP =
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index a8ba740..f2d7055 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -481,6 +481,21 @@ enum SignalContainerCommandProto {
FORCEFUL_SHUTDOWN = 3;
}
+enum SchedulerOperationTypeProto {
+ QUEUE_ADD = 1;
+ QUEUE_REMOVE = 2;
+ QUEUE_UPDATE = 3;
+}
+
+message QueueConfigurationMutationProto {
+ optional string queueName = 1;
+ optional SchedulerOperationTypeProto operationType = 2 [default = QUEUE_UPDATE];
+ repeated StringStringMapProto configuration = 3;
+}
+
+message SchedulerConfigurationMutationProto {
+ repeated QueueConfigurationMutationProto queueMutation = 1;
+}
////////////////////////////////////////////////////////////////////////
////// From reservation_protocol /////////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index df3c852..fb1f554 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -277,6 +277,14 @@ message UpdateApplicationTimeoutsResponseProto {
repeated ApplicationUpdateTimeoutMapProto application_timeouts = 1;
}
+message SchedulerConfigurationMutationRequestProto {
+ optional SchedulerConfigurationMutationProto scheduler_mutation = 1;
+}
+
+message SchedulerConfigurationMutationResponseProto {
+}
+
+
//////////////////////////////////////////////////////
/////// client_NM_Protocol ///////////////////////////
//////////////////////////////////////////////////////
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SchedulerConfigurationMutationRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SchedulerConfigurationMutationRequestPBImpl.java
new file mode 100644
index 0000000..e66898e
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SchedulerConfigurationMutationRequestPBImpl.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationRequest;
+import org.apache.hadoop.yarn.api.records.SchedulerConfigurationMutation;
+import org.apache.hadoop.yarn.api.records.impl.pb.SchedulerConfigurationMutationPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.SchedulerConfigurationMutationProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerConfigurationMutationRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerConfigurationMutationRequestProto;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class SchedulerConfigurationMutationRequestPBImpl extends SchedulerConfigurationMutationRequest {
+ SchedulerConfigurationMutationRequestProto proto = SchedulerConfigurationMutationRequestProto
+ .getDefaultInstance();
+ SchedulerConfigurationMutationRequestProto.Builder builder = null;
+
+ boolean viaProto = false;
+
+ private SchedulerConfigurationMutation mutation = null;
+
+ public SchedulerConfigurationMutationRequestPBImpl() {
+ builder = SchedulerConfigurationMutationRequestProto.newBuilder();
+ }
+
+ public SchedulerConfigurationMutationRequestPBImpl(SchedulerConfigurationMutationRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SchedulerConfigurationMutationRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (mutation != null) {
+ builder.setSchedulerMutation(convertToProtoFormat(mutation));
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = SchedulerConfigurationMutationRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public SchedulerConfigurationMutation getSchedulerConfigurationMutation() {
+ SchedulerConfigurationMutationRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.mutation != null) {
+ return this.mutation;
+ }
+ if (!p.hasSchedulerMutation()) {
+ return null;
+ }
+ this.mutation = convertFromProtoFormat(p.getSchedulerMutation());
+ return this.mutation;
+ }
+
+ @Override
+ public void setSchedulerConfigurationMutation(
+ SchedulerConfigurationMutation schedulerConfigurationMutation) {
+ maybeInitBuilder();
+ if (schedulerConfigurationMutation == null) {
+ builder.clearSchedulerMutation();
+ }
+ this.mutation = schedulerConfigurationMutation;
+ }
+
+ private SchedulerConfigurationMutationPBImpl convertFromProtoFormat(SchedulerConfigurationMutationProto p) {
+ return new SchedulerConfigurationMutationPBImpl(p);
+ }
+
+ private SchedulerConfigurationMutationProto convertToProtoFormat(SchedulerConfigurationMutation t) {
+ return ((SchedulerConfigurationMutationPBImpl) t).getProto();
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SchedulerConfigurationMutationResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SchedulerConfigurationMutationResponsePBImpl.java
new file mode 100644
index 0000000..06d11c1
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SchedulerConfigurationMutationResponsePBImpl.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationResponse;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerConfigurationMutationResponseProto;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class SchedulerConfigurationMutationResponsePBImpl extends SchedulerConfigurationMutationResponse {
+ SchedulerConfigurationMutationResponseProto proto =
+ SchedulerConfigurationMutationResponseProto.getDefaultInstance();
+ SchedulerConfigurationMutationResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public SchedulerConfigurationMutationResponseProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ public SchedulerConfigurationMutationResponsePBImpl() {
+ builder = SchedulerConfigurationMutationResponseProto.newBuilder();
+ }
+
+ public SchedulerConfigurationMutationResponsePBImpl(
+ SchedulerConfigurationMutationResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index ab283e7..0183e15 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -41,9 +41,11 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueConfigurationMutation;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SchedulerOperationType;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
@@ -65,6 +67,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.SchedulerOperationTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto;
@@ -244,6 +247,18 @@ public static QueueACL convertFromProtoFormat(QueueACLProto e) {
return QueueACL.valueOf(e.name().replace(QUEUE_ACL_PREFIX, ""));
}
+ /*
+ * SchedulerOperationType
+ */
+ private static String SCHED_CONF_PREFIX = "QUEUE_";
+ public static SchedulerOperationType convertFromProtoFormat(
+ SchedulerOperationTypeProto e) {
+ return SchedulerOperationType.valueOf(e.name().replace(SCHED_CONF_PREFIX, ""));
+ }
+ public static SchedulerOperationTypeProto convertToProtoFormat(
+ SchedulerOperationType e) {
+ return SchedulerOperationTypeProto.valueOf(SCHED_CONF_PREFIX + e.name());
+ }
/*
* ApplicationAccessType
@@ -396,6 +411,19 @@ public static ContainerStatusPBImpl convertFromProtoFormat(
}
/*
+ * QueueConfigurationMutation
+ */
+ public static QueueConfigurationMutationPBImpl convertFromProtoFormat(
+ YarnProtos.QueueConfigurationMutationProto p) {
+ return new QueueConfigurationMutationPBImpl(p);
+ }
+
+ public static YarnProtos.QueueConfigurationMutationProto
+ convertToProtoFormat(QueueConfigurationMutation t) {
+ return ((QueueConfigurationMutationPBImpl) t).getProto();
+ }
+
+ /*
* ContainerId
*/
public static ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueConfigurationMutationPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueConfigurationMutationPBImpl.java
new file mode 100644
index 0000000..8bc1f0f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueConfigurationMutationPBImpl.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.QueueConfigurationMutation;
+import org.apache.hadoop.yarn.api.records.SchedulerOperationType;
+import org.apache.hadoop.yarn.proto.YarnProtos.QueueConfigurationMutationProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.QueueConfigurationMutationProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.SchedulerOperationTypeProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+@Private
+@Unstable
+public class QueueConfigurationMutationPBImpl extends QueueConfigurationMutation {
+
+ QueueConfigurationMutationProto proto = QueueConfigurationMutationProto.getDefaultInstance();
+ QueueConfigurationMutationProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private SchedulerOperationType opType;
+ private Map configurationMutation;
+
+ public QueueConfigurationMutationPBImpl() {
+ builder = QueueConfigurationMutationProto.newBuilder();
+ }
+
+ public QueueConfigurationMutationPBImpl(QueueConfigurationMutationProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ @Override
+ public String getQueueName() {
+ QueueConfigurationMutationProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.hasQueueName()) ? p.getQueueName() : null;
+ }
+
+ @Override
+ public void setQueueName(String queueName) {
+ maybeInitBuilder();
+ if (queueName == null) {
+ builder.clearQueueName();
+ return;
+ }
+ builder.setQueueName(queueName);
+ }
+
+ @Override
+ public SchedulerOperationType getOperationType() {
+ QueueConfigurationMutationProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.opType != null) {
+ return this.opType;
+ }
+ if (!p.hasOperationType()) {
+ return SchedulerOperationType.UPDATE;
+ }
+ this.opType = convertFromProtoFormat(p.getOperationType());
+ return this.opType;
+ }
+
+ @Override
+ public void setOperationType(SchedulerOperationType operationType) {
+ maybeInitBuilder();
+ if (operationType == null) {
+ builder.clearOperationType();
+ return;
+ }
+ builder.setOperationType(convertToProtoFormat(operationType));
+ }
+
+ @Override
+ public Map getConfigurationMutation() {
+ initConfigurationMutation();
+ return this.configurationMutation;
+ }
+
+ private void initConfigurationMutation() {
+ if (this.configurationMutation != null) {
+ return;
+ }
+ QueueConfigurationMutationProtoOrBuilder p = viaProto ? proto : builder;
+ List confs = p.getConfigurationList();
+ this.configurationMutation = new HashMap();
+
+ for (StringStringMapProto c : confs) {
+ this.configurationMutation.put(c.getKey(), c.getValue());
+ }
+ }
+
+ @Override
+ public void setConfigurationMutation(Map confs) {
+ if (confs == null) {
+ return;
+ }
+ initConfigurationMutation();
+ this.configurationMutation.clear();
+ this.configurationMutation.putAll(confs);
+ }
+
+ public QueueConfigurationMutationProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = QueueConfigurationMutationProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.configurationMutation != null) {
+ addConfigurationMutationToProto();
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void addConfigurationMutationToProto() {
+ maybeInitBuilder();
+ builder.clearConfiguration();
+ if (configurationMutation == null) {
+ return;
+ }
+ Iterable iterable =
+ new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+ Iterator keyIter = configurationMutation.keySet().iterator();
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StringStringMapProto next() {
+ String key = keyIter.next();
+ String value = configurationMutation.get(key);
+ if (value == null) {
+ value = "";
+ }
+ return StringStringMapProto.newBuilder().setKey(key)
+ .setValue((value)).build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return keyIter.hasNext();
+ }
+ };
+ }
+ };
+ builder.addAllConfiguration(iterable);
+ }
+
+ private SchedulerOperationType convertFromProtoFormat(SchedulerOperationTypeProto q) {
+ return ProtoUtils.convertFromProtoFormat(q);
+ }
+
+ private SchedulerOperationTypeProto convertToProtoFormat(SchedulerOperationType type) {
+ return ProtoUtils.convertToProtoFormat(type);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulerConfigurationMutationPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulerConfigurationMutationPBImpl.java
new file mode 100644
index 0000000..51520e3
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulerConfigurationMutationPBImpl.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.QueueConfigurationMutation;
+import org.apache.hadoop.yarn.api.records.SchedulerConfigurationMutation;
+import org.apache.hadoop.yarn.proto.YarnProtos.SchedulerConfigurationMutationProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.SchedulerConfigurationMutationProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.QueueConfigurationMutationProto;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+@Private
+@Unstable
+public class SchedulerConfigurationMutationPBImpl extends SchedulerConfigurationMutation {
+
+ SchedulerConfigurationMutationProto proto = SchedulerConfigurationMutationProto.getDefaultInstance();
+ SchedulerConfigurationMutationProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private List queueMutations;
+
+ public SchedulerConfigurationMutationPBImpl() {
+ builder = SchedulerConfigurationMutationProto.newBuilder();
+ }
+
+ public SchedulerConfigurationMutationPBImpl(SchedulerConfigurationMutationProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public SchedulerConfigurationMutationProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.queueMutations != null) {
+ addQueueMutationsToProto();
+ }
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = SchedulerConfigurationMutationProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void addQueueMutationsToProto() {
+ maybeInitBuilder();
+ builder.clearQueueMutation();
+ if (this.queueMutations == null)
+ return;
+ Iterable iterable =
+ new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+ Iterator iter = queueMutations
+ .iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public QueueConfigurationMutationProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllQueueMutation(iterable);
+ }
+
+ @Override
+ public List getQueueMutations() {
+ initQueueMutations();
+ return this.queueMutations;
+ }
+
+ private void initQueueMutations() {
+ if (this.queueMutations != null) {
+ return;
+ }
+ SchedulerConfigurationMutationProtoOrBuilder p = viaProto ? proto : builder;
+ List mutations = p.getQueueMutationList();
+ this.queueMutations = new ArrayList<>();
+ for (QueueConfigurationMutationProto mutationProto : mutations) {
+ queueMutations.add(convertFromProtoFormat(mutationProto));
+ }
+ }
+
+ @Override
+ public void setQueueMutations(List mutations) {
+ if (mutations == null) {
+ return;
+ }
+ initQueueMutations();
+ this.queueMutations.clear();
+ this.queueMutations.addAll(mutations);
+ }
+
+ private QueueConfigurationMutation convertFromProtoFormat(QueueConfigurationMutationProto p) {
+ return ProtoUtils.convertFromProtoFormat(p);
+ }
+
+ private QueueConfigurationMutationProto convertToProtoFormat(QueueConfigurationMutation q) {
+ return ProtoUtils.convertToProtoFormat(q);
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index 10323d5..5eccba3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -30,6 +30,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
@@ -89,6 +91,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SchedulerConfigurationMutationRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SchedulerConfigurationMutationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl;
@@ -123,6 +127,7 @@
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueConfigurationMutation;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
@@ -138,6 +143,7 @@
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.api.records.SchedulerConfigurationMutation;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.Token;
@@ -169,12 +175,14 @@
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.QueueConfigurationMutationPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.QueueInfoPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.QueueUserACLInfoPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceOptionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.SchedulerConfigurationMutationPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.StrictPreemptionContractPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
@@ -203,12 +211,14 @@
import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.QueueConfigurationMutationProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueInfoProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueUserACLInfoProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.SchedulerConfigurationMutationProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StrictPreemptionContractProto;
import org.apache.hadoop.yarn.proto.YarnProtos.URLProto;
@@ -289,6 +299,8 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerConfigurationMutationRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerConfigurationMutationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersResponseProto;
@@ -399,6 +411,10 @@ public static void setup() throws Exception {
generateByNewInstance(RollbackResponse.class);
generateByNewInstance(CommitResponse.class);
generateByNewInstance(ApplicationTimeout.class);
+ generateByNewInstance(QueueConfigurationMutation.class);
+ generateByNewInstance(SchedulerConfigurationMutation.class);
+ generateByNewInstance(SchedulerConfigurationMutationRequest.class);
+ generateByNewInstance(SchedulerConfigurationMutationResponse.class);
}
@Test
@@ -1144,4 +1160,28 @@ public void testExecutionTypeRequestPBImpl() throws Exception {
validatePBImplRecord(ExecutionTypeRequestPBImpl.class,
ExecutionTypeRequestProto.class);
}
+
+ @Test
+ public void testSchedulerConfigurationMutationPBImpl() throws Exception {
+ validatePBImplRecord(SchedulerConfigurationMutationPBImpl.class,
+ SchedulerConfigurationMutationProto.class);
+ }
+
+ @Test
+ public void testQueueConfigurationMutationPBImpl() throws Exception {
+ validatePBImplRecord(QueueConfigurationMutationPBImpl.class,
+ QueueConfigurationMutationProto.class);
+ }
+
+ @Test
+ public void testSchedulerConfigurationMutationRequestPBImpl() throws Exception {
+ validatePBImplRecord(SchedulerConfigurationMutationRequestPBImpl.class,
+ SchedulerConfigurationMutationRequestProto.class);
+ }
+
+ @Test
+ public void testSchedulerConfigurationMutationResponsePBImpl() throws Exception {
+ validatePBImplRecord(SchedulerConfigurationMutationResponsePBImpl.class,
+ SchedulerConfigurationMutationResponseProto.class);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
index f584c94..b8596ae 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
@@ -85,6 +85,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@@ -511,4 +513,11 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
throws YarnException, IOException {
throw new NotImplementedException();
}
+
+ @Override
+ public SchedulerConfigurationMutationResponse mutateSchedulerConfiguration(
+ SchedulerConfigurationMutationRequest request)
+ throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 6985d65..9cab724 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -126,6 +126,11 @@
jaxb-api
+ org.apache.derby
+ derby
+ 10.10.2.0
+
+
org.codehaus.jettison
jettison
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index c375887..0f82fce 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -102,6 +102,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@@ -1786,6 +1788,23 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
return response;
}
+ @Override
+ public SchedulerConfigurationMutationResponse mutateSchedulerConfiguration(
+ SchedulerConfigurationMutationRequest request) throws IOException, YarnException {
+ SchedulerConfigurationMutationResponse response = recordFactory
+ .newRecordInstance(SchedulerConfigurationMutationResponse.class);
+
+ UserGroupInformation callerUGI;
+ try {
+ callerUGI = UserGroupInformation.getCurrentUser();
+ } catch (IOException ie) {
+ LOG.info("Error getting UGI ", ie);
+ throw RPCUtil.getRemoteException(ie);
+ }
+ this.rmContext.getMutableConfigurationManager().updateConfiguration(callerUGI, request);
+ return response;
+ }
+
private UserGroupInformation getCallerUgi(ApplicationId applicationId,
String operation) throws YarnException {
UserGroupInformation callerUGI;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 26ef5ac..588768d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.conf.MutableConfigurationManager;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
@@ -40,7 +41,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
@@ -156,4 +156,8 @@ void setRMDelegatedNodeLabelsUpdater(
RMAppLifetimeMonitor getRMAppLifetimeMonitor();
String getHAZookeeperConnectionState();
+
+ MutableConfigurationManager getMutableConfigurationManager();
+
+ void setMutableConfigurationManager(MutableConfigurationManager mutableConfMgr);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index a452f95..d1b0cea 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.conf.MutableConfigurationManager;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
@@ -44,7 +45,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
@@ -82,6 +82,8 @@
private final Object haServiceStateLock = new Object();
+ private MutableConfigurationManager mutableConfMgr;
+
/**
* Default constructor. To be used in conjunction with setter methods for
* individual fields.
@@ -522,4 +524,14 @@ public String getHAZookeeperConnectionState() {
return elector.getZookeeperConnectionState();
}
}
+
+ @Override
+ public MutableConfigurationManager getMutableConfigurationManager() {
+ return this.mutableConfMgr;
+ }
+
+ @Override
+ public void setMutableConfigurationManager(MutableConfigurationManager mutableConfMgr) {
+ this.mutableConfMgr = mutableConfMgr;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/CapacitySchedulerMutationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/CapacitySchedulerMutationPolicy.java
new file mode 100644
index 0000000..84608d4
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/CapacitySchedulerMutationPolicy.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.conf;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class CapacitySchedulerMutationPolicy implements ConfigurationMutationPolicy {
+
+ @Override
+ public boolean applyPolicy(UserGroupInformation ugi, Map confUpdate, Set confRemove) {
+ return true;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/ConfigurationMutationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/ConfigurationMutationPolicy.java
new file mode 100644
index 0000000..225c942
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/ConfigurationMutationPolicy.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.conf;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+public interface ConfigurationMutationPolicy {
+
+ public boolean applyPolicy(UserGroupInformation ugi, Map confUpdate, Set confRemove);
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/MutableConfigurationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/MutableConfigurationManager.java
new file mode 100644
index 0000000..f08e46f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/MutableConfigurationManager.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.conf;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
+import org.apache.hadoop.yarn.api.records.QueueConfigurationMutation;
+import org.apache.hadoop.yarn.api.records.SchedulerConfigurationMutation;
+import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.conf.store.DerbyConfigurationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.conf.store.YarnConfigurationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+
+public class MutableConfigurationManager extends LocalConfigurationProvider {
+
+ private YarnConfigurationStore confStore;
+ private ConfigurationMutationPolicy mutationPolicy;
+
+ private Configuration activeConfiguration;
+
+ private RMContext rmContext;
+ private Configuration conf;
+
+ private int activeId;
+
+ private static final Log LOG = LogFactory.getLog(MutableConfigurationManager.class);
+
+ @Override
+ public void initInternal(Configuration conf) throws Exception {
+ Class extends YarnConfigurationStore> storeClass = conf
+ .getClass(YarnConfiguration.RM_CONF_BACKING_STORE_CLASS,
+ DerbyConfigurationStore.class, YarnConfigurationStore.class);
+ this.confStore = ReflectionUtils.newInstance(storeClass, conf);
+ LOG.info("Initialized configuration store with " + storeClass);
+ this.confStore.initialize(conf, flattenConf(YarnConfiguration.CS_CONFIGURATION_FILE));
+ // For now, only support mutations for capacity scheduler configuration.
+ Class extends ConfigurationMutationPolicy> policyClass =
+ conf.getClass(YarnConfiguration.RM_CONF_MUTATION_POLICY, CapacitySchedulerMutationPolicy.class)
+ .asSubclass(ConfigurationMutationPolicy.class);
+ this.mutationPolicy = ReflectionUtils.newInstance(policyClass, conf);
+ LOG.info("Initialized configuration mutation policy " + policyClass);
+ this.conf = conf;
+ this.activeId = 0;
+ }
+
+ @Override
+ public void closeInternal() throws Exception {
+
+ }
+
+ public void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
+ }
+
+ public Configuration getConfigurationObject() {
+ if (activeConfiguration == null) {
+ activeConfiguration = getConfFromStore();
+ }
+ return activeConfiguration;
+ }
+
+ public synchronized void updateConfiguration(UserGroupInformation ugi, SchedulerConfigurationMutationRequest mutationRequest)
+ throws IOException {
+ SchedulerConfigurationMutation schedMutation = mutationRequest.getSchedulerConfigurationMutation();
+ if (schedMutation != null) {
+ Map confUpdate = new HashMap<>();
+ Set confRemove = new HashSet<>();
+ for (QueueConfigurationMutation mutation : schedMutation.getQueueMutations()) {
+ String queueName = mutation.getQueueName();
+ switch (mutation.getOperationType()) {
+ case ADD:
+ // For now, add queues with capacity 0.
+ confUpdate.put(CapacitySchedulerConfiguration.PREFIX + queueName + CapacitySchedulerConfiguration.DOT +
+ CapacitySchedulerConfiguration.CAPACITY, "0");
+ String siblingKey = CapacitySchedulerConfiguration.PREFIX + queueName.substring(0, queueName.lastIndexOf('.')) + CapacitySchedulerConfiguration.DOT
+ + CapacitySchedulerConfiguration.QUEUES;
+ Collection siblingQueues =
+ activeConfiguration.getStringCollection(siblingKey);
+ siblingQueues.add(queueName.substring(queueName.lastIndexOf('.') + 1));
+ confUpdate.put(siblingKey, Joiner.on(',').join(siblingQueues));
+ case UPDATE:
+ for (Map.Entry conf : mutation.getConfigurationMutation().entrySet()) {
+ String confKey = CapacitySchedulerConfiguration.PREFIX + queueName +
+ CapacitySchedulerConfiguration.DOT + conf.getKey();
+ confUpdate.put(confKey, conf.getValue());
+ }
+ break;
+ case REMOVE:
+ // Remove all configuration keys for this queue.
+ confRemove.addAll(activeConfiguration.getValByRegex(CapacitySchedulerConfiguration.PREFIX + queueName + ".*").keySet());
+ siblingKey = CapacitySchedulerConfiguration.PREFIX + queueName.substring(0, queueName.lastIndexOf('.')) + CapacitySchedulerConfiguration.DOT
+ + CapacitySchedulerConfiguration.QUEUES;
+ siblingQueues = activeConfiguration.getStringCollection(siblingKey);
+ siblingQueues.remove(queueName.substring(queueName.lastIndexOf('.') + 1));
+ if (siblingQueues.size() > 0) {
+ confUpdate.put(siblingKey, Joiner.on(',').join(siblingQueues));
+ } else {
+ confRemove.add(siblingKey);
+ }
+ break;
+ }
+ }
+ confStore.logMutations(ugi, confUpdate, confRemove, activeId++);
+ boolean success = true;
+ try {
+ tryRefresh(ugi, confUpdate, confRemove);
+ } catch (IOException e) {
+ success = false;
+ }
+ if (success) {
+ confStore.store(confUpdate, confRemove);
+ } else {
+ confStore.store(null, null);
+ }
+ }
+ }
+
+ /* Apply the configuration update in memory. If failed, revert the in memory conf and throw an exception. */
+ private void tryRefresh(UserGroupInformation ugi, Map confUpdate, Set confRemove) throws IOException {
+ if (mutationPolicy.applyPolicy(ugi, confUpdate, confRemove)) {
+ Configuration backupConf = new Configuration(activeConfiguration);
+ applyProposedMutation(confUpdate, confRemove);
+ try {
+ rmContext.getScheduler().reinitialize(conf, rmContext);
+ } catch (IOException e) {
+ activeConfiguration = backupConf;
+ LOG.warn("Failed to reinitialize", e);
+ throw e;
+ }
+ } else {
+ throw new IOException("Scheduler configuration update is invalid.");
+ }
+ }
+
+ /** Apply the configuration changes to the active configuration so the scheduler can attempt to load this
+ * configuration on reinitialization. */
+ private void applyProposedMutation(Map proposed, Set remove) {
+ for (Map.Entry entry : proposed.entrySet()) {
+ activeConfiguration.set(entry.getKey(), entry.getValue());
+ }
+ for (String k : remove) {
+ activeConfiguration.unset(k);
+ }
+ }
+
+ private Configuration getConfFromStore() {
+ Map flatConf = confStore.retrieve();
+ Configuration conf = new Configuration();
+ for (Map.Entry entry : flatConf.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ return conf;
+ }
+
+ /* Recover the configuration in the store on startup. Replays any logs that were not persisted
+ * to the store.
+ */
+ public void recover() {
+ activeId = confStore.readLastId();
+ List