diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 394454f..7d2d9b9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; @@ -589,4 +591,20 @@ SignalContainerResponse signalToContainer( public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( UpdateApplicationTimeoutsRequest request) throws YarnException, IOException; + + /** + *

+ * The interface for sending scheduler configuration mutation requests for + * changing configurations at runtime. + *

+ * @param request configuration mutation request + * @return an empty response that the update has completed successfully. + * @throws YarnException + * @throws IOException + */ + @Public + @Unstable + @Idempotent + public SchedulerConfigurationMutationResponse mutateSchedulerConfiguration( + SchedulerConfigurationMutationRequest request) throws IOException, YarnException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SchedulerConfigurationMutationRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SchedulerConfigurationMutationRequest.java new file mode 100644 index 0000000..1d04d38 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SchedulerConfigurationMutationRequest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.records.SchedulerConfigurationMutation; +import org.apache.hadoop.yarn.util.Records; + + +/** + *

+ * The request sent by the client to the ResourceManager to add or + * remove or update scheduler configurations. + *

+ *

+ * The request includes the {@link SchedulerConfigurationMutation} + *

+ * + * @see ApplicationClientProtocol#mutateSchedulerConfiguration(SchedulerConfigurationMutationRequest) + */ + +@Public +@Unstable +public abstract class SchedulerConfigurationMutationRequest { + public static SchedulerConfigurationMutationRequest newInstance( + SchedulerConfigurationMutation schedulerConfigurationMutation) { + SchedulerConfigurationMutationRequest request = + Records.newRecord(SchedulerConfigurationMutationRequest.class); + request.setSchedulerConfigurationMutation(schedulerConfigurationMutation); + return request; + } + + /** + * Get the SchedulerConfigurationMutation. + * + * @return SchedulerConfigurationMutation + */ + public abstract SchedulerConfigurationMutation getSchedulerConfigurationMutation(); + + /** + * Set the SchedulerConfigurationMutation. + * + * @param schedulerConfigurationMutation SchedulerConfigurationMutation + */ + public abstract void setSchedulerConfigurationMutation( + SchedulerConfigurationMutation schedulerConfigurationMutation); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SchedulerConfigurationMutationResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SchedulerConfigurationMutationResponse.java new file mode 100644 index 0000000..9fe3204 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SchedulerConfigurationMutationResponse.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + + +/** + *

+ * The response sent by the ResourceManager to the client on add or + * remove or update scheduler configurations. + *

+ *

+ * A response without exception means that the configuration mutation has completed + * successfully. + *

+ */ +@Public +@Unstable +public abstract class SchedulerConfigurationMutationResponse { + + public static SchedulerConfigurationMutationResponse newInstance() { + SchedulerConfigurationMutationResponse response = + Records.newRecord(SchedulerConfigurationMutationResponse.class); + return response; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueConfigurationMutation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueConfigurationMutation.java new file mode 100644 index 0000000..d7f657c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueConfigurationMutation.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import java.util.Map; + +/** + *

QueueConfigurationMutation encapsulates mutations for a + * single queue's configuration.

+ * + * @see SchedulerConfigurationMutation + */ +@Public +@Unstable +public abstract class QueueConfigurationMutation { + + @Private + @Unstable + public static QueueConfigurationMutation newInstance(String queueName, + SchedulerOperationType operationType, Map configuration) { + QueueConfigurationMutation mutation = Records.newRecord(QueueConfigurationMutation.class); + mutation.setQueueName(queueName); + mutation.setOperationType(operationType); + mutation.setConfigurationMutation(configuration); + return mutation; + } + + /** + * Get the queue name of this queue configuration change + * @return queue name + */ + @Private + @Unstable + public abstract String getQueueName(); + + /** + * Set the queue name of this queue configuration change + * @param queueName queue name to set + */ + @Private + @Unstable + public abstract void setQueueName(String queueName); + + /** + * Get the operation type of this queue configuration change + * @return SchedulerOperationType of this queue + * configuration change + */ + @Private + @Unstable + public abstract SchedulerOperationType getOperationType(); + + /** + * Set the SchedulerOperationType of this + * queue configuration change + * @param operationType operation type of this queue configuration change + */ + @Private + @Unstable + public abstract void setOperationType(SchedulerOperationType operationType); + + /** + * Get the params for this queue configuration change + * @return Map of key-value pairs for this configuration change + */ + @Private + @Unstable + public abstract Map getConfigurationMutation(); + + /** + * Set the params for this queue configuration change + * @param configuration map of key-values for this queue configuration change + */ + @Private + @Unstable + public abstract void setConfigurationMutation(Map configuration); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerConfigurationMutation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerConfigurationMutation.java new file mode 100644 index 0000000..4df76c6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerConfigurationMutation.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; + +/** + *

SchedulerConfigurationMutation encapsulates mutations + * for scheduler configuration.

+ * @see ConfigurationMutationRequest + */ +@Public +@Unstable +public abstract class SchedulerConfigurationMutation { + + @Private + @Unstable + public static SchedulerConfigurationMutation newInstance( + List mutations) { + SchedulerConfigurationMutation mutation = + Records.newRecord(SchedulerConfigurationMutation.class); + mutation.setQueueMutations(mutations); + return mutation; + } + + /** + * Get the queue mutations of this scheduler configuration change. + * @return queue mutations + */ + @Private + @Unstable + public abstract List getQueueMutations(); + + /** + * Set list of queue mutations for this scheduler configuration change. + * @param mutations list of queue mutations + */ + @Private + @Unstable + public abstract void setQueueMutations(List mutations); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerOperationType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerOperationType.java new file mode 100644 index 0000000..7fe74b2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerOperationType.java @@ -0,0 +1,44 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Type of scheduler configuration update. + */ +@Public +@Evolving +public enum SchedulerOperationType { + /** + * Add queue and set its configurations. + */ + ADD, + + /** + * Remove queue. + */ + REMOVE, + + /** + * Update queue's configurations. + */ + UPDATE, +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7dd5ce3..4f878cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -597,6 +597,17 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS = "org.apache.hadoop.yarn.LocalConfigurationProvider"; + public static final String USE_MUTABLE_QUEUE_CONFIG = YARN_PREFIX + + "scheduler.mutable-queue-config.enabled"; + + public static final String RM_CONF_BACKING_STORE_CLASS = RM_PREFIX + + "configuration.store.class"; + public static final String RM_SCHEDULER_CONFIGURATION_PROVIDER_CLASS = RM_PREFIX + + "scheduler.provider-class"; + + public static final String RM_CONF_MUTATION_POLICY = RM_PREFIX + + "configuration.mutation.policy"; + public static final String YARN_AUTHORIZATION_PROVIDER = YARN_PREFIX + "authorization-provider"; private static final List RM_SERVICES_ADDRESS_CONF_KEYS_HTTP = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index a8ba740..f2d7055 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -481,6 +481,21 @@ enum SignalContainerCommandProto { FORCEFUL_SHUTDOWN = 3; } +enum SchedulerOperationTypeProto { + QUEUE_ADD = 1; + QUEUE_REMOVE = 2; + QUEUE_UPDATE = 3; +} + +message QueueConfigurationMutationProto { + optional string queueName = 1; + optional SchedulerOperationTypeProto operationType = 2 [default = QUEUE_UPDATE]; + repeated StringStringMapProto configuration = 3; +} + +message SchedulerConfigurationMutationProto { + repeated QueueConfigurationMutationProto queueMutation = 1; +} //////////////////////////////////////////////////////////////////////// ////// From reservation_protocol ///////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index df3c852..fb1f554 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -277,6 +277,14 @@ message UpdateApplicationTimeoutsResponseProto { repeated ApplicationUpdateTimeoutMapProto application_timeouts = 1; } +message SchedulerConfigurationMutationRequestProto { + optional SchedulerConfigurationMutationProto scheduler_mutation = 1; +} + +message SchedulerConfigurationMutationResponseProto { +} + + ////////////////////////////////////////////////////// /////// client_NM_Protocol /////////////////////////// ////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SchedulerConfigurationMutationRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SchedulerConfigurationMutationRequestPBImpl.java new file mode 100644 index 0000000..e66898e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SchedulerConfigurationMutationRequestPBImpl.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationRequest; +import org.apache.hadoop.yarn.api.records.SchedulerConfigurationMutation; +import org.apache.hadoop.yarn.api.records.impl.pb.SchedulerConfigurationMutationPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulerConfigurationMutationProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerConfigurationMutationRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerConfigurationMutationRequestProto; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class SchedulerConfigurationMutationRequestPBImpl extends SchedulerConfigurationMutationRequest { + SchedulerConfigurationMutationRequestProto proto = SchedulerConfigurationMutationRequestProto + .getDefaultInstance(); + SchedulerConfigurationMutationRequestProto.Builder builder = null; + + boolean viaProto = false; + + private SchedulerConfigurationMutation mutation = null; + + public SchedulerConfigurationMutationRequestPBImpl() { + builder = SchedulerConfigurationMutationRequestProto.newBuilder(); + } + + public SchedulerConfigurationMutationRequestPBImpl(SchedulerConfigurationMutationRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public SchedulerConfigurationMutationRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (mutation != null) { + builder.setSchedulerMutation(convertToProtoFormat(mutation)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SchedulerConfigurationMutationRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public SchedulerConfigurationMutation getSchedulerConfigurationMutation() { + SchedulerConfigurationMutationRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.mutation != null) { + return this.mutation; + } + if (!p.hasSchedulerMutation()) { + return null; + } + this.mutation = convertFromProtoFormat(p.getSchedulerMutation()); + return this.mutation; + } + + @Override + public void setSchedulerConfigurationMutation( + SchedulerConfigurationMutation schedulerConfigurationMutation) { + maybeInitBuilder(); + if (schedulerConfigurationMutation == null) { + builder.clearSchedulerMutation(); + } + this.mutation = schedulerConfigurationMutation; + } + + private SchedulerConfigurationMutationPBImpl convertFromProtoFormat(SchedulerConfigurationMutationProto p) { + return new SchedulerConfigurationMutationPBImpl(p); + } + + private SchedulerConfigurationMutationProto convertToProtoFormat(SchedulerConfigurationMutation t) { + return ((SchedulerConfigurationMutationPBImpl) t).getProto(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SchedulerConfigurationMutationResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SchedulerConfigurationMutationResponsePBImpl.java new file mode 100644 index 0000000..06d11c1 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SchedulerConfigurationMutationResponsePBImpl.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerConfigurationMutationResponseProto; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class SchedulerConfigurationMutationResponsePBImpl extends SchedulerConfigurationMutationResponse { + SchedulerConfigurationMutationResponseProto proto = + SchedulerConfigurationMutationResponseProto.getDefaultInstance(); + SchedulerConfigurationMutationResponseProto.Builder builder = null; + boolean viaProto = false; + + public SchedulerConfigurationMutationResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + public SchedulerConfigurationMutationResponsePBImpl() { + builder = SchedulerConfigurationMutationResponseProto.newBuilder(); + } + + public SchedulerConfigurationMutationResponsePBImpl( + SchedulerConfigurationMutationResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index ab283e7..0183e15 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -41,9 +41,11 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueConfigurationMutation; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SchedulerOperationType; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; @@ -65,6 +67,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulerOperationTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; @@ -244,6 +247,18 @@ public static QueueACL convertFromProtoFormat(QueueACLProto e) { return QueueACL.valueOf(e.name().replace(QUEUE_ACL_PREFIX, "")); } + /* + * SchedulerOperationType + */ + private static String SCHED_CONF_PREFIX = "QUEUE_"; + public static SchedulerOperationType convertFromProtoFormat( + SchedulerOperationTypeProto e) { + return SchedulerOperationType.valueOf(e.name().replace(SCHED_CONF_PREFIX, "")); + } + public static SchedulerOperationTypeProto convertToProtoFormat( + SchedulerOperationType e) { + return SchedulerOperationTypeProto.valueOf(SCHED_CONF_PREFIX + e.name()); + } /* * ApplicationAccessType @@ -396,6 +411,19 @@ public static ContainerStatusPBImpl convertFromProtoFormat( } /* + * QueueConfigurationMutation + */ + public static QueueConfigurationMutationPBImpl convertFromProtoFormat( + YarnProtos.QueueConfigurationMutationProto p) { + return new QueueConfigurationMutationPBImpl(p); + } + + public static YarnProtos.QueueConfigurationMutationProto + convertToProtoFormat(QueueConfigurationMutation t) { + return ((QueueConfigurationMutationPBImpl) t).getProto(); + } + + /* * ContainerId */ public static ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueConfigurationMutationPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueConfigurationMutationPBImpl.java new file mode 100644 index 0000000..8bc1f0f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueConfigurationMutationPBImpl.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.QueueConfigurationMutation; +import org.apache.hadoop.yarn.api.records.SchedulerOperationType; +import org.apache.hadoop.yarn.proto.YarnProtos.QueueConfigurationMutationProto; +import org.apache.hadoop.yarn.proto.YarnProtos.QueueConfigurationMutationProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulerOperationTypeProto; +import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +@Private +@Unstable +public class QueueConfigurationMutationPBImpl extends QueueConfigurationMutation { + + QueueConfigurationMutationProto proto = QueueConfigurationMutationProto.getDefaultInstance(); + QueueConfigurationMutationProto.Builder builder = null; + boolean viaProto = false; + + private SchedulerOperationType opType; + private Map configurationMutation; + + public QueueConfigurationMutationPBImpl() { + builder = QueueConfigurationMutationProto.newBuilder(); + } + + public QueueConfigurationMutationPBImpl(QueueConfigurationMutationProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public String getQueueName() { + QueueConfigurationMutationProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasQueueName()) ? p.getQueueName() : null; + } + + @Override + public void setQueueName(String queueName) { + maybeInitBuilder(); + if (queueName == null) { + builder.clearQueueName(); + return; + } + builder.setQueueName(queueName); + } + + @Override + public SchedulerOperationType getOperationType() { + QueueConfigurationMutationProtoOrBuilder p = viaProto ? proto : builder; + if (this.opType != null) { + return this.opType; + } + if (!p.hasOperationType()) { + return SchedulerOperationType.UPDATE; + } + this.opType = convertFromProtoFormat(p.getOperationType()); + return this.opType; + } + + @Override + public void setOperationType(SchedulerOperationType operationType) { + maybeInitBuilder(); + if (operationType == null) { + builder.clearOperationType(); + return; + } + builder.setOperationType(convertToProtoFormat(operationType)); + } + + @Override + public Map getConfigurationMutation() { + initConfigurationMutation(); + return this.configurationMutation; + } + + private void initConfigurationMutation() { + if (this.configurationMutation != null) { + return; + } + QueueConfigurationMutationProtoOrBuilder p = viaProto ? proto : builder; + List confs = p.getConfigurationList(); + this.configurationMutation = new HashMap(); + + for (StringStringMapProto c : confs) { + this.configurationMutation.put(c.getKey(), c.getValue()); + } + } + + @Override + public void setConfigurationMutation(Map confs) { + if (confs == null) { + return; + } + initConfigurationMutation(); + this.configurationMutation.clear(); + this.configurationMutation.putAll(confs); + } + + public QueueConfigurationMutationProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = QueueConfigurationMutationProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.configurationMutation != null) { + addConfigurationMutationToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void addConfigurationMutationToProto() { + maybeInitBuilder(); + builder.clearConfiguration(); + if (configurationMutation == null) { + return; + } + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator keyIter = configurationMutation.keySet().iterator(); + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public StringStringMapProto next() { + String key = keyIter.next(); + String value = configurationMutation.get(key); + if (value == null) { + value = ""; + } + return StringStringMapProto.newBuilder().setKey(key) + .setValue((value)).build(); + } + + @Override + public boolean hasNext() { + return keyIter.hasNext(); + } + }; + } + }; + builder.addAllConfiguration(iterable); + } + + private SchedulerOperationType convertFromProtoFormat(SchedulerOperationTypeProto q) { + return ProtoUtils.convertFromProtoFormat(q); + } + + private SchedulerOperationTypeProto convertToProtoFormat(SchedulerOperationType type) { + return ProtoUtils.convertToProtoFormat(type); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulerConfigurationMutationPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulerConfigurationMutationPBImpl.java new file mode 100644 index 0000000..51520e3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulerConfigurationMutationPBImpl.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.QueueConfigurationMutation; +import org.apache.hadoop.yarn.api.records.SchedulerConfigurationMutation; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulerConfigurationMutationProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulerConfigurationMutationProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnProtos.QueueConfigurationMutationProto; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +@Private +@Unstable +public class SchedulerConfigurationMutationPBImpl extends SchedulerConfigurationMutation { + + SchedulerConfigurationMutationProto proto = SchedulerConfigurationMutationProto.getDefaultInstance(); + SchedulerConfigurationMutationProto.Builder builder = null; + boolean viaProto = false; + + private List queueMutations; + + public SchedulerConfigurationMutationPBImpl() { + builder = SchedulerConfigurationMutationProto.newBuilder(); + } + + public SchedulerConfigurationMutationPBImpl(SchedulerConfigurationMutationProto proto) { + this.proto = proto; + viaProto = true; + } + + public SchedulerConfigurationMutationProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (this.queueMutations != null) { + addQueueMutationsToProto(); + } + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SchedulerConfigurationMutationProto.newBuilder(proto); + } + viaProto = false; + } + + private void addQueueMutationsToProto() { + maybeInitBuilder(); + builder.clearQueueMutation(); + if (this.queueMutations == null) + return; + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = queueMutations + .iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public QueueConfigurationMutationProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllQueueMutation(iterable); + } + + @Override + public List getQueueMutations() { + initQueueMutations(); + return this.queueMutations; + } + + private void initQueueMutations() { + if (this.queueMutations != null) { + return; + } + SchedulerConfigurationMutationProtoOrBuilder p = viaProto ? proto : builder; + List mutations = p.getQueueMutationList(); + this.queueMutations = new ArrayList<>(); + for (QueueConfigurationMutationProto mutationProto : mutations) { + queueMutations.add(convertFromProtoFormat(mutationProto)); + } + } + + @Override + public void setQueueMutations(List mutations) { + if (mutations == null) { + return; + } + initQueueMutations(); + this.queueMutations.clear(); + this.queueMutations.addAll(mutations); + } + + private QueueConfigurationMutation convertFromProtoFormat(QueueConfigurationMutationProto p) { + return ProtoUtils.convertFromProtoFormat(p); + } + + private QueueConfigurationMutationProto convertToProtoFormat(QueueConfigurationMutation q) { + return ProtoUtils.convertToProtoFormat(q); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index 10323d5..5eccba3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; @@ -89,6 +91,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SchedulerConfigurationMutationRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SchedulerConfigurationMutationResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl; @@ -123,6 +127,7 @@ import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueConfigurationMutation; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueStatistics; @@ -138,6 +143,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.SchedulerConfigurationMutation; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.Token; @@ -169,12 +175,14 @@ import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionResourceRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.QueueConfigurationMutationPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.QueueInfoPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.QueueUserACLInfoPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceOptionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.SchedulerConfigurationMutationPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.StrictPreemptionContractPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; @@ -203,12 +211,14 @@ import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto; import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; +import org.apache.hadoop.yarn.proto.YarnProtos.QueueConfigurationMutationProto; import org.apache.hadoop.yarn.proto.YarnProtos.QueueInfoProto; import org.apache.hadoop.yarn.proto.YarnProtos.QueueUserACLInfoProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulerConfigurationMutationProto; import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto; import org.apache.hadoop.yarn.proto.YarnProtos.StrictPreemptionContractProto; import org.apache.hadoop.yarn.proto.YarnProtos.URLProto; @@ -289,6 +299,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerConfigurationMutationRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerConfigurationMutationResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersResponseProto; @@ -399,6 +411,10 @@ public static void setup() throws Exception { generateByNewInstance(RollbackResponse.class); generateByNewInstance(CommitResponse.class); generateByNewInstance(ApplicationTimeout.class); + generateByNewInstance(QueueConfigurationMutation.class); + generateByNewInstance(SchedulerConfigurationMutation.class); + generateByNewInstance(SchedulerConfigurationMutationRequest.class); + generateByNewInstance(SchedulerConfigurationMutationResponse.class); } @Test @@ -1144,4 +1160,28 @@ public void testExecutionTypeRequestPBImpl() throws Exception { validatePBImplRecord(ExecutionTypeRequestPBImpl.class, ExecutionTypeRequestProto.class); } + + @Test + public void testSchedulerConfigurationMutationPBImpl() throws Exception { + validatePBImplRecord(SchedulerConfigurationMutationPBImpl.class, + SchedulerConfigurationMutationProto.class); + } + + @Test + public void testQueueConfigurationMutationPBImpl() throws Exception { + validatePBImplRecord(QueueConfigurationMutationPBImpl.class, + QueueConfigurationMutationProto.class); + } + + @Test + public void testSchedulerConfigurationMutationRequestPBImpl() throws Exception { + validatePBImplRecord(SchedulerConfigurationMutationRequestPBImpl.class, + SchedulerConfigurationMutationRequestProto.class); + } + + @Test + public void testSchedulerConfigurationMutationResponsePBImpl() throws Exception { + validatePBImplRecord(SchedulerConfigurationMutationResponsePBImpl.class, + SchedulerConfigurationMutationResponseProto.class); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java index f584c94..b8596ae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -85,6 +85,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -511,4 +513,11 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( throws YarnException, IOException { throw new NotImplementedException(); } + + @Override + public SchedulerConfigurationMutationResponse mutateSchedulerConfiguration( + SchedulerConfigurationMutationRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index 6985d65..9cab724 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -126,6 +126,11 @@ jaxb-api + org.apache.derby + derby + 10.10.2.0 + + org.codehaus.jettison jettison diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index c375887..0f82fce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -102,6 +102,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -1786,6 +1788,23 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( return response; } + @Override + public SchedulerConfigurationMutationResponse mutateSchedulerConfiguration( + SchedulerConfigurationMutationRequest request) throws IOException, YarnException { + SchedulerConfigurationMutationResponse response = recordFactory + .newRecordInstance(SchedulerConfigurationMutationResponse.class); + + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + throw RPCUtil.getRemoteException(ie); + } + this.rmContext.getMutableConfigurationManager().updateConfiguration(callerUGI, request); + return response; + } + private UserGroupInformation getCallerUgi(ApplicationId applicationId, String operation) throws YarnException { UserGroupInformation callerUGI; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 26ef5ac..588768d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.conf.MutableConfigurationManager; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; @@ -40,7 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; @@ -156,4 +156,8 @@ void setRMDelegatedNodeLabelsUpdater( RMAppLifetimeMonitor getRMAppLifetimeMonitor(); String getHAZookeeperConnectionState(); + + MutableConfigurationManager getMutableConfigurationManager(); + + void setMutableConfigurationManager(MutableConfigurationManager mutableConfMgr); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index a452f95..d1b0cea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.conf.MutableConfigurationManager; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; @@ -44,7 +45,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; @@ -82,6 +82,8 @@ private final Object haServiceStateLock = new Object(); + private MutableConfigurationManager mutableConfMgr; + /** * Default constructor. To be used in conjunction with setter methods for * individual fields. @@ -522,4 +524,14 @@ public String getHAZookeeperConnectionState() { return elector.getZookeeperConnectionState(); } } + + @Override + public MutableConfigurationManager getMutableConfigurationManager() { + return this.mutableConfMgr; + } + + @Override + public void setMutableConfigurationManager(MutableConfigurationManager mutableConfMgr) { + this.mutableConfMgr = mutableConfMgr; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/CapacitySchedulerMutationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/CapacitySchedulerMutationPolicy.java new file mode 100644 index 0000000..84608d4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/CapacitySchedulerMutationPolicy.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.conf; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.security.UserGroupInformation; + +public class CapacitySchedulerMutationPolicy implements ConfigurationMutationPolicy { + + @Override + public boolean applyPolicy(UserGroupInformation ugi, Map confUpdate, Set confRemove) { + return true; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/ConfigurationMutationPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/ConfigurationMutationPolicy.java new file mode 100644 index 0000000..225c942 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/ConfigurationMutationPolicy.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.conf; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.security.UserGroupInformation; + +public interface ConfigurationMutationPolicy { + + public boolean applyPolicy(UserGroupInformation ugi, Map confUpdate, Set confRemove); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/MutableConfigurationManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/MutableConfigurationManager.java new file mode 100644 index 0000000..f08e46f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/MutableConfigurationManager.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.conf; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Joiner; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.LocalConfigurationProvider; +import org.apache.hadoop.yarn.api.records.QueueConfigurationMutation; +import org.apache.hadoop.yarn.api.records.SchedulerConfigurationMutation; +import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.conf.store.DerbyConfigurationStore; +import org.apache.hadoop.yarn.server.resourcemanager.conf.store.YarnConfigurationStore; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +public class MutableConfigurationManager extends LocalConfigurationProvider { + + private YarnConfigurationStore confStore; + private ConfigurationMutationPolicy mutationPolicy; + + private Configuration activeConfiguration; + + private RMContext rmContext; + private Configuration conf; + + private int activeId; + + private static final Log LOG = LogFactory.getLog(MutableConfigurationManager.class); + + @Override + public void initInternal(Configuration conf) throws Exception { + Class storeClass = conf + .getClass(YarnConfiguration.RM_CONF_BACKING_STORE_CLASS, + DerbyConfigurationStore.class, YarnConfigurationStore.class); + this.confStore = ReflectionUtils.newInstance(storeClass, conf); + LOG.info("Initialized configuration store with " + storeClass); + this.confStore.initialize(conf, flattenConf(YarnConfiguration.CS_CONFIGURATION_FILE)); + // For now, only support mutations for capacity scheduler configuration. + Class policyClass = + conf.getClass(YarnConfiguration.RM_CONF_MUTATION_POLICY, CapacitySchedulerMutationPolicy.class) + .asSubclass(ConfigurationMutationPolicy.class); + this.mutationPolicy = ReflectionUtils.newInstance(policyClass, conf); + LOG.info("Initialized configuration mutation policy " + policyClass); + this.conf = conf; + this.activeId = 0; + } + + @Override + public void closeInternal() throws Exception { + + } + + public void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + + public Configuration getConfigurationObject() { + if (activeConfiguration == null) { + activeConfiguration = getConfFromStore(); + } + return activeConfiguration; + } + + public synchronized void updateConfiguration(UserGroupInformation ugi, SchedulerConfigurationMutationRequest mutationRequest) + throws IOException { + SchedulerConfigurationMutation schedMutation = mutationRequest.getSchedulerConfigurationMutation(); + if (schedMutation != null) { + Map confUpdate = new HashMap<>(); + Set confRemove = new HashSet<>(); + for (QueueConfigurationMutation mutation : schedMutation.getQueueMutations()) { + String queueName = mutation.getQueueName(); + switch (mutation.getOperationType()) { + case ADD: + // For now, add queues with capacity 0. + confUpdate.put(CapacitySchedulerConfiguration.PREFIX + queueName + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.CAPACITY, "0"); + String siblingKey = CapacitySchedulerConfiguration.PREFIX + queueName.substring(0, queueName.lastIndexOf('.')) + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES; + Collection siblingQueues = + activeConfiguration.getStringCollection(siblingKey); + siblingQueues.add(queueName.substring(queueName.lastIndexOf('.') + 1)); + confUpdate.put(siblingKey, Joiner.on(',').join(siblingQueues)); + case UPDATE: + for (Map.Entry conf : mutation.getConfigurationMutation().entrySet()) { + String confKey = CapacitySchedulerConfiguration.PREFIX + queueName + + CapacitySchedulerConfiguration.DOT + conf.getKey(); + confUpdate.put(confKey, conf.getValue()); + } + break; + case REMOVE: + // Remove all configuration keys for this queue. + confRemove.addAll(activeConfiguration.getValByRegex(CapacitySchedulerConfiguration.PREFIX + queueName + ".*").keySet()); + siblingKey = CapacitySchedulerConfiguration.PREFIX + queueName.substring(0, queueName.lastIndexOf('.')) + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES; + siblingQueues = activeConfiguration.getStringCollection(siblingKey); + siblingQueues.remove(queueName.substring(queueName.lastIndexOf('.') + 1)); + if (siblingQueues.size() > 0) { + confUpdate.put(siblingKey, Joiner.on(',').join(siblingQueues)); + } else { + confRemove.add(siblingKey); + } + break; + } + } + confStore.logMutations(ugi, confUpdate, confRemove, activeId++); + boolean success = true; + try { + tryRefresh(ugi, confUpdate, confRemove); + } catch (IOException e) { + success = false; + } + if (success) { + confStore.store(confUpdate, confRemove); + } else { + confStore.store(null, null); + } + } + } + + /* Apply the configuration update in memory. If failed, revert the in memory conf and throw an exception. */ + private void tryRefresh(UserGroupInformation ugi, Map confUpdate, Set confRemove) throws IOException { + if (mutationPolicy.applyPolicy(ugi, confUpdate, confRemove)) { + Configuration backupConf = new Configuration(activeConfiguration); + applyProposedMutation(confUpdate, confRemove); + try { + rmContext.getScheduler().reinitialize(conf, rmContext); + } catch (IOException e) { + activeConfiguration = backupConf; + LOG.warn("Failed to reinitialize", e); + throw e; + } + } else { + throw new IOException("Scheduler configuration update is invalid."); + } + } + + /** Apply the configuration changes to the active configuration so the scheduler can attempt to load this + * configuration on reinitialization. */ + private void applyProposedMutation(Map proposed, Set remove) { + for (Map.Entry entry : proposed.entrySet()) { + activeConfiguration.set(entry.getKey(), entry.getValue()); + } + for (String k : remove) { + activeConfiguration.unset(k); + } + } + + private Configuration getConfFromStore() { + Map flatConf = confStore.retrieve(); + Configuration conf = new Configuration(); + for (Map.Entry entry : flatConf.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + return conf; + } + + /* Recover the configuration in the store on startup. Replays any logs that were not persisted + * to the store. + */ + public void recover() { + activeId = confStore.readLastId(); + List> unpersistedMutations = confStore.getMutations(activeId); + for (Map confMutation : unpersistedMutations) { + try { + Set toRemove = new HashSet<>(); + for (Iterator> itr = confMutation.entrySet().iterator(); itr.hasNext();) { + Map.Entry entry = itr.next(); + if (entry.getValue() == null) { + toRemove.add(entry.getKey()); + itr.remove(); + } + } + tryRefresh(null, confMutation, toRemove); + confStore.store(confMutation, toRemove); + } catch (IOException e) { + LOG.warn("Logged mutation was invalid.", e); + confStore.store(null, null); + } finally { + activeId++; + } + } + } + + private Map flattenConf(String resource) { + Map flatConf = new HashMap<>(); + Configuration tmp = new Configuration(false); + tmp.addResource(resource); + for (Map.Entry entry : tmp) { + flatConf.put(entry.getKey(), entry.getValue()); + } + return flatConf; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/store/ConfigurationType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/store/ConfigurationType.java new file mode 100644 index 0000000..4660550 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/store/ConfigurationType.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.conf.store; + +public enum ConfigurationType { + SCHEDULER +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/store/DerbyConfigurationStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/store/DerbyConfigurationStore.java new file mode 100644 index 0000000..fceeba7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/store/DerbyConfigurationStore.java @@ -0,0 +1,573 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.conf.store; + +import com.google.common.base.Joiner; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.derby.jdbc.EmbeddedConnectionPoolDataSource; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLSyntaxErrorException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class DerbyConfigurationStore implements YarnConfigurationStore { + + private static final Log LOG = + LogFactory.getLog(DerbyConfigurationStore.class); + + /** Default connection URL for retrieving values via Java stored procedures. **/ + protected static final String DEFAULT_DB_URL = "jdbc:default:connection"; + private static final String TABLE_NOT_EXISTS_CODE = "42X05"; + private static final String SHUTDOWN_ERROR_CODE = "XJ015"; + + private static final String KV_DELIM = "="; + private static final String CONF_DELIM = ";"; + + // Configuration keys in yarn-site.xml for specifying where on disk to store the + // database as well as the location of the initialization script for creating the tables + // and stored procedures. + protected static final String CAPACITY_SCHEDULER_CONFIG_PATH = + "yarn.scheduler.capacity.config.path"; + private static final String CAPACITY_SCHEDULER_CONFIG_DB_INIT_SCRIPT = + "yarn.scheduler.capacity.config.db.init-script"; + + private static final String DEFAULT_CAPACITY_SCHEDULER_CONFIG_DB_PATH = "/db"; + private static final String DEFAULT_CAPACITY_SCHEDULER_CONFIG_DB_INIT_SCRIPT = "initialize.sql"; + private static final String SHUTDOWN_URL = "jdbc:derby:;shutdown=true"; + + private static String dbName; + private EmbeddedDataSource dataSrc; + + private static final String GET_SCHED_CONF = "{call GET_SCHED_CONF()}"; + private static final String SET_SCHED_CONF = "{call SET_SCHED_CONF(?)}"; + private static final String GET_LAST_ID = "{call GET_LAST_ID()}"; + private static final String SET_LAST_ID = "{call SET_LAST_ID(?)}"; + private static final String GET_PARTIAL_SCHED_CONF = "{call GET_PARTIAL_SCHED_CONF(?)}"; + private static final String ADD_LOG = "{call ADD_LOG(?,?,?,?,?)}"; + private static final String DELETE_CONF = "{call DELETE_CONF(?)}"; + private static final List PROCEDURE_CALLS = Arrays.asList(GET_SCHED_CONF, SET_SCHED_CONF, + GET_LAST_ID, SET_LAST_ID, GET_PARTIAL_SCHED_CONF, ADD_LOG, DELETE_CONF); + + private static final String GET_PROCEDURE = + "CREATE PROCEDURE GET_SCHED_CONF() " + + "DYNAMIC RESULT SETS 1 " + + "LANGUAGE JAVA " + + "READS SQL DATA " + + "EXTERNAL NAME '" + DerbyConfigurationStore.class.getCanonicalName() + ".getSchedConf' " + + "PARAMETER STYLE JAVA"; + private static final String SET_PROCEDURE = + "CREATE PROCEDURE SET_SCHED_CONF(kv varchar(32672)) " + + "DYNAMIC RESULT SETS 0 " + + "LANGUAGE JAVA " + + "READS SQL DATA " + + "EXTERNAL NAME '" + DerbyConfigurationStore.class.getCanonicalName() + ".setSchedConf' " + + "PARAMETER STYLE JAVA"; + private static final String GET_ID_PROCEDURE = + "CREATE PROCEDURE GET_LAST_ID() " + + "DYNAMIC RESULT SETS 1 " + + "LANGUAGE JAVA " + + "READS SQL DATA " + + "EXTERNAL NAME '" + DerbyConfigurationStore.class.getCanonicalName() + ".getLastId' " + + "PARAMETER STYLE JAVA"; + private static final String SET_ID_PROCEDURE = + "CREATE PROCEDURE SET_LAST_ID(id integer) " + + "DYNAMIC RESULT SETS 0 " + + "LANGUAGE JAVA " + + "READS SQL DATA " + + "EXTERNAL NAME '" + DerbyConfigurationStore.class.getCanonicalName() + ".setLastId' " + + "PARAMETER STYLE JAVA"; + private static final String GET_PARTIAL_PROCEDURE = + "CREATE PROCEDURE GET_PARTIAL_SCHED_CONF(id integer) " + + "DYNAMIC RESULT SETS 1 " + + "LANGUAGE JAVA " + + "READS SQL DATA " + + "EXTERNAL NAME '" + DerbyConfigurationStore.class.getCanonicalName() + ".getPartialSchedConf' " + + "PARAMETER STYLE JAVA"; + private static final String ADD_LOG_PROCEDURE = + "CREATE PROCEDURE ADD_LOG(id integer, ugi varchar(100), k varchar(250), v varchar(32672), isDelete boolean) " + + "DYNAMIC RESULT SETS 0 " + + "LANGUAGE JAVA " + + "READS SQL DATA " + + "EXTERNAL NAME '" + DerbyConfigurationStore.class.getCanonicalName() + ".addLog' " + + "PARAMETER STYLE JAVA"; + private static final String DELETE_CONF_PROCEDURE = + "CREATE PROCEDURE DELETE_CONF(k varchar(32672)) " + + "DYNAMIC RESULT SETS 0 " + + "LANGUAGE JAVA " + + "READS SQL DATA " + + "EXTERNAL NAME '" + DerbyConfigurationStore.class.getCanonicalName() + ".deleteConf' " + + "PARAMETER STYLE JAVA"; + private static final List PROCEDURE_DEFINITIONS = Arrays.asList(GET_PROCEDURE, SET_PROCEDURE, + GET_ID_PROCEDURE, SET_ID_PROCEDURE, GET_PARTIAL_PROCEDURE, ADD_LOG_PROCEDURE, DELETE_CONF_PROCEDURE); + + + private static String CONF_ALL_GETTER = "select k, v from sched_conf"; + private static String CONF_GETTER = "select v from sched_conf where k = ?"; + private static String CONF_UPDATE = "update sched_conf set v = ? where k = ?"; + private static String CONF_INSERT = "insert into sched_conf(k, v) values (?, ?)"; + private static String ID_GETTER = "select id from id"; + private static String ID_SETTER = "update id set id = ? where id = ?"; + private static String ID_INSERT = "insert into id values (0)"; + private static String CONF_PARTIAL_GETTER = "select k, v, id, ugi, isDelete from logs where id >= ? order by id desc"; + private static String LOG_ADDER = "insert into logs values (?, ?, ?, ?, ?)"; + private static String CONF_DELETER = "delete from sched_conf where k = ?"; + + @Override + public Map retrieve() { + Map conf = new HashMap<>(); + try (Connection conn = dataSrc.getConnection(); + CallableStatement getter = conn.prepareCall(GET_SCHED_CONF);) { + ResultSet rs = null; + try { + conn.setAutoCommit(false); + boolean hasResult = getter.execute(); + rs = getter.getResultSet(); + if (hasResult) { + while (rs.next()) { + conf.put(rs.getString(1), rs.getString(2)); + } + } + } finally { + if (rs != null) { + rs.close(); + } + conn.commit(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + return conf; + } + + @Override + public void store(Map conf, Set confRemove) { + try (Connection conn = dataSrc.getConnection(); + CallableStatement setter = conn.prepareCall(SET_SCHED_CONF); + CallableStatement idgetter = conn.prepareCall(GET_LAST_ID); + CallableStatement idsetter = conn.prepareCall(SET_LAST_ID); + CallableStatement deleter = conn.prepareCall(DELETE_CONF);) { + conn.setAutoCommit(false); + int id = -1; + boolean hasResult = idgetter.execute(); + try (ResultSet rs = idgetter.getResultSet();) { + if (hasResult) { + rs.next(); + id = rs.getInt(1); + } + } + idsetter.setInt(1, id + 1); + idsetter.execute(); + if (conf != null) { + setter.setString(1, Joiner.on(CONF_DELIM).withKeyValueSeparator(KV_DELIM).join(conf)); + setter.execute(); + } + if (confRemove != null) { + deleter.setString(1, Joiner.on(CONF_DELIM).join(confRemove)); + deleter.execute(); + } + conn.commit(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + protected static void getAttribute(Connection conn, PreparedStatement ps, + ResultSet[] data) throws SQLException { + // Do not close ResultSet! ResultSet returned through stored procedure cannot + // be closed within procedure method. + ResultSet rs = ps.executeQuery(); + boolean hasData = rs.isBeforeFirst(); + if (hasData) { + data[0] = rs; + } + } + + public static void getSchedConf(ResultSet[] data) + throws SQLException { + try (Connection conn = DriverManager.getConnection(DEFAULT_DB_URL);) { + PreparedStatement ps = conn.prepareStatement(CONF_ALL_GETTER, + ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE); + getAttribute(conn, ps, data); + ps.closeOnCompletion(); + } + } + + public static void setSchedConf(String kv) throws SQLException { + try ( + Connection conn = DriverManager.getConnection(DerbyConfigurationStore.getDBName()); + PreparedStatement select = conn.prepareStatement(CONF_GETTER); + PreparedStatement update = conn.prepareStatement(CONF_UPDATE); + PreparedStatement insert = conn.prepareStatement(CONF_INSERT);) { + for (String conf : kv.split(CONF_DELIM)) { + String[] parts = conf.split(KV_DELIM); + String key = parts[0]; + String value = parts[1]; + select.setString(1, key); + boolean hasData = select.executeQuery().next(); + if (hasData) { + update.setString(1, value); + update.setString(2, key); + update.addBatch(); + } else { + insert.setString(1, key); + insert.setString(2, value); + insert.addBatch(); + } + } + update.executeBatch(); + insert.executeBatch(); + } + } + + public static void getPartialSchedConf(int id, ResultSet[] data) throws SQLException { + try (Connection conn = DriverManager.getConnection(DEFAULT_DB_URL);) { + PreparedStatement getter = conn.prepareStatement(CONF_PARTIAL_GETTER, + ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE); + getter.setInt(1, id); + getAttribute(conn, getter, data); + getter.closeOnCompletion(); + } + } + public static void addLog(int id, String ugi, String k, String v, boolean isDelete) throws SQLException { + try (Connection conn = DriverManager.getConnection(DerbyConfigurationStore.getDBName()); + PreparedStatement adder = conn.prepareStatement(LOG_ADDER);) { + adder.setString(1, k); + adder.setString(2, v); + adder.setString(3, ugi); + adder.setInt(4, id); + adder.setBoolean(5, isDelete); + adder.execute(); + } + } + public static void deleteConf(String k) throws SQLException { + try (Connection conn = DriverManager.getConnection(DerbyConfigurationStore.getDBName()); + PreparedStatement deleter = conn.prepareStatement(CONF_DELETER);) { + for (String conf : k.split(CONF_DELIM)) { + deleter.setString(1, conf); + deleter.addBatch(); + } + deleter.executeBatch(); + } + } + public static void getLastId(ResultSet[] data) throws SQLException { + try (Connection conn = DriverManager.getConnection(DEFAULT_DB_URL);) { + PreparedStatement ps = conn.prepareStatement(ID_GETTER, + ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE); + getAttribute(conn, ps, data); + ps.closeOnCompletion(); + } + } + + public static void setLastId(int id) throws SQLException { + try (Connection conn = DriverManager.getConnection(DerbyConfigurationStore.getDBName()); + PreparedStatement select = conn.prepareStatement(ID_GETTER); + PreparedStatement update = conn.prepareStatement(ID_SETTER); + PreparedStatement insert = conn.prepareStatement(ID_INSERT);) { + ResultSet rs = select.executeQuery(); + if (rs.next()) { + int oldId = rs.getInt(1); + update.setInt(1, id); + update.setInt(2, oldId); + update.execute(); + } else { + insert.execute(); + } + } + } + + public DerbyConfigurationStore() { + attachShutDownHook(); + } + + /** + * Shutdown hook for shutting down the database. + */ + private void attachShutDownHook() { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + LOG.info("Derby JDBC program ending."); + boolean gotSQLExc = false; + try { + DriverManager.getConnection(SHUTDOWN_URL); + } catch (SQLException se) { + if (se.getSQLState().equals(SHUTDOWN_ERROR_CODE)) { + gotSQLExc = true; + } + } catch (Exception e) { + LOG.warn("Base configuration provider did not close properly."); + } + if (!gotSQLExc) { + LOG.warn("Database did not shut down normally"); + } else { + LOG.info("Database shut down normally"); + } + } + }); + } + + public static String getDBName() { + return dbName; + } + + /** + * Utility method for checking the existence of tables. + * @param conn + * @return Whether the tables exist + * @throws SQLException + */ + private static boolean checkForTables(Connection conn) throws SQLException { + try (Statement s = conn.createStatement()) { + LOG.info("Trying to find table"); + s.execute("select v from sched_conf where k is null"); + s.execute("select v from logs where id = 0"); + s.execute("select id from id"); + } catch (SQLException sqle) { + String theError = sqle.getSQLState(); + // If table exists will get - WARNING 02000: No row was found + if (theError.equals(TABLE_NOT_EXISTS_CODE)) { + // Table does not exist + LOG.info("Table not found."); + DatabaseMetaData dmd = conn.getMetaData(); + EmbeddedDBUtil.dropSchema(dmd, "APP"); + return false; + } else { + throw sqle; + } + } + LOG.info("Found table"); + return true; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private boolean checkForTableSchema(Connection conn) throws SQLException { + try { + LOG.info("Check for any schema mismatch between the deployed db and the " + + "definition of it..."); + for (String procedure : PROCEDURE_CALLS) { + try (CallableStatement checker = conn.prepareCall(procedure);) { } + } + } catch (SQLSyntaxErrorException e) { + String theError = e.getSQLState(); + LOG.info("Check schema SQLSyntaxErrorException " + theError, e); + LOG.info("Schema mismatch detected. Truncate tables and stored procedures " + + "for regenerating from config file."); + DatabaseMetaData dmd = conn.getMetaData(); + EmbeddedDBUtil.dropSchema(dmd, "APP"); + return false; + } + return true; + } + + /** + * Initializes the configuration provider. It first checks for the existence of tables. + * If they do not exist, this method reads the initialization script and creates + * all the tables and stored procedures. + * {@inheritDoc} + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void initialize(Configuration conf, Map schedConf) { + String configPath = conf.get(CAPACITY_SCHEDULER_CONFIG_PATH); + if (configPath == null) { + throw new IllegalArgumentException("Missing configuration parameter " + + CAPACITY_SCHEDULER_CONFIG_PATH); + } + String dbPath = configPath + DEFAULT_CAPACITY_SCHEDULER_CONFIG_DB_PATH; + dbName = "jdbc:derby:" + dbPath; + synchronized (this) { + if (dataSrc == null) { + dataSrc = new EmbeddedConnectionPoolDataSource(); + dataSrc.setDatabaseName(dbPath); + dataSrc.setCreateDatabase("create"); + } + } + + String initScriptPath = conf.get(CAPACITY_SCHEDULER_CONFIG_DB_INIT_SCRIPT, + DEFAULT_CAPACITY_SCHEDULER_CONFIG_DB_INIT_SCRIPT); + boolean regenerateDB = false; + try (Connection conn = dataSrc.getConnection();) { + regenerateDB = !checkForTables(conn) || !checkForTableSchema(conn); + } catch (SQLException e) { + LOG.warn(e); + throw new RuntimeException(e); + } + + try (Connection conn = dataSrc.getConnection(); + Statement stmt = conn.createStatement(); + BufferedReader reader = new File(initScriptPath).exists() ? + new BufferedReader(new InputStreamReader( + new FileInputStream(initScriptPath), StandardCharsets.UTF_8)) : + new BufferedReader(new InputStreamReader( + this.getClass().getClassLoader() + .getResourceAsStream(DEFAULT_CAPACITY_SCHEDULER_CONFIG_DB_INIT_SCRIPT))) + ) { + LOG.info("Connected to database " + dbName); + // Check for if the config tables are missing in the embedded DB or if there's + // schema mismatch in the deployed embedded DB. If so, create the tables. + if (regenerateDB) { + LOG.info("Creating tables"); + String createString = readAsString(reader); + for (String sql : createString.split(";")) { + stmt.addBatch(sql); + } + + for (String procedure : PROCEDURE_DEFINITIONS) { + stmt.addBatch(procedure); + } + stmt.executeBatch(); + + store(schedConf, null); + } + } catch (SQLException e) { + LOG.warn(e); + throw new RuntimeException(e); + } catch (FileNotFoundException e) { + LOG.warn(e); + throw new RuntimeException(e); + } catch (IOException e) { + LOG.warn(e); + throw new RuntimeException(e); + } + } + + private String readAsString(BufferedReader reader) throws IOException { + StringBuilder sb = new StringBuilder(); + String line = reader.readLine(); + while (line != null) { + if (!line.startsWith("--")) { + sb.append(line); + } + line = reader.readLine(); + } + return sb.toString(); + } + + @Override + public int readLastId() { + try (Connection conn = dataSrc.getConnection(); + CallableStatement getter = conn.prepareCall(GET_LAST_ID);) { + ResultSet rs = null; + try { + conn.setAutoCommit(false); + boolean hasResult = getter.execute(); + rs = getter.getResultSet(); + if (hasResult) { + rs.next(); + return rs.getInt(1); + } + } finally { + if (rs != null) { + rs.close(); + } + conn.commit(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + return -1; + } + + @Override + public List> getMutations(int lastId) { + List> mutations = new ArrayList<>(); + try (Connection conn = dataSrc.getConnection(); + CallableStatement getter = conn.prepareCall(GET_PARTIAL_SCHED_CONF);) { + ResultSet rs = null; + try { + conn.setAutoCommit(false); + getter.setInt(1, lastId); + boolean hasResult = getter.execute(); + rs = getter.getResultSet(); + if (hasResult) { + int id = lastId - 1; + while (rs.next()) { + if (rs.getInt(3) != id) { + mutations.add(new HashMap<>()); + id = rs.getInt(3); + } + if (!rs.getBoolean(5)) { + mutations.get(mutations.size() - 1).put(rs.getString(1), rs.getString(2)); + } else { + mutations.get(mutations.size() - 1).put(rs.getString(1), null); + } + } + } + } finally { + if (rs != null) { + rs.close(); + } + conn.commit(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + return mutations; + } + + @Override + public void logMutations(UserGroupInformation ugi, Map mutations, Set removes, int id) { + try (Connection conn = dataSrc.getConnection(); + CallableStatement adder = conn.prepareCall(ADD_LOG);) { + conn.setAutoCommit(false); + adder.setInt(1, id); + adder.setString(2, ugi.getShortUserName()); + for (Map.Entry entry : mutations.entrySet()) { + adder.setString(3, entry.getKey()); + adder.setString(4, entry.getValue()); + adder.setBoolean(5, false); + adder.addBatch(); + } + + for (String remove : removes) { + adder.setString(3, remove); + adder.setString(4, null); + adder.setBoolean(5, true); + adder.addBatch(); + } + adder.executeBatch(); + conn.commit(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/store/EmbeddedDBUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/store/EmbeddedDBUtil.java new file mode 100644 index 0000000..a52bb93 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/store/EmbeddedDBUtil.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.conf.store; + +import java.sql.BatchUpdateException; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +public class EmbeddedDBUtil { + + public static void dropSchema(DatabaseMetaData dmd, String schema) throws SQLException { + try ( + ResultSet funcs = dmd.getFunctions(null, schema, null); + ResultSet procedures = dmd.getProcedures(null, schema, null); + ResultSet tables = dmd.getTables(null, schema, null, new String[] {"TABLE"}); + Statement s = dmd.getConnection().createStatement(); + ) { + dropUsingDMD(s, funcs, schema, "FUNCTION_NAME", "FUNCTION"); + dropUsingDMD(s, procedures, schema, "PROCEDURE_NAME", "PROCEDURE"); + dropUsingDMD(s, tables, schema, "TABLE_NAME", "TABLE"); + } + } + + private static void dropUsingDMD(Statement s, ResultSet rs, String schema, + String column, String dropType) throws SQLException { + // Generate the drop statement for all objects in the specified dropType + String dropStmtPrefix = "DROP " + dropType + " "; + List dropDdls = new ArrayList(); + while (rs.next()) { + String objectName = rs.getString(column); + dropDdls.add(dropStmtPrefix + escape(schema, objectName)); + } + if (dropDdls.isEmpty()) { + return; + } + + // Execute the batch drop statements + s.clearBatch(); + for (String ddl : dropDdls) { + s.addBatch(ddl); + } + int[] results; + boolean hadError = true; + try { + results = s.executeBatch(); + hadError = false; + } catch (BatchUpdateException e) { + results = e.getUpdateCounts(); + } + + // Verify if any drop statement failed, set successful executions to null + boolean didDrop = false; + for (int i = 0; i < results.length; i++) { + int result = results[i]; + if (result == Statement.EXECUTE_FAILED) { + hadError = true; + } else if (result == Statement.SUCCESS_NO_INFO || result >= 0) { + didDrop = true; + dropDdls.set(i, null); + } + } + s.clearBatch(); + if (didDrop) { + s.getConnection().commit(); + } + + // Re-execute the failed drop statement until there are none left or none + // succeed. This will cover the case where dependency between different objects + // exist, and a different drop oder will allow all objects to be dropped. + if (hadError) { + do { + hadError = false; + didDrop = false; + for (ListIterator it = dropDdls.listIterator(); it.hasNext();) { + String sql = it.next(); + if (sql != null) { + try { + s.executeUpdate(sql); + it.set(null); + didDrop = true; + } catch (SQLException e) { + hadError = true; + } + } + } + if (didDrop) { + s.getConnection().commit(); + } + } while (hadError && didDrop); + } + } + + private static String escape(String schema, String object) { + return "\"" + schema.replaceAll("\"", "\"\"") + "\".\"" + + object.replaceAll("\"", "\"\"") + "\""; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/store/YarnConfigurationStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/store/YarnConfigurationStore.java new file mode 100644 index 0000000..bc6c599 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/conf/store/YarnConfigurationStore.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.conf.store; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public interface YarnConfigurationStore { + + // Initialize the store with configuration key-values schedConf. + public void initialize(Configuration conf, Map schedConf); + + // Retrieve key-value map of configuration in store. + public Map retrieve(); + + // Update configuration store with conf key-value mappings. Remove configuration keys in confRemove. + public void store(Map conf, Set confRemove); + + // Read the txn id for the last configuration update stored in the configuration store. + public int readLastId(); + + // Get an ordered list of logged configuration updates with txnid >= lastId. + // The list is in chronological order of submitted configuration updates. + // Configuration updates within the same txn are in the same map in the list. + public List> getMutations(int lastId); + + // Log the configuration updates (mutations/removes) with txnid id. + public void logMutations(UserGroupInformation ugi, Map mutations, Set removes, int id); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ced310e..fb5d856 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -46,6 +47,7 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -68,6 +70,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.conf.MutableConfigurationManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; @@ -167,6 +170,7 @@ private int offswitchPerHeartbeatLimit; + private boolean useStore; @Override public void setConf(Configuration conf) { @@ -297,7 +301,7 @@ public void setRMContext(RMContext rmContext) { @VisibleForTesting void initScheduler(Configuration configuration) throws - IOException { + Exception { try { writeLock.lock(); this.conf = loadCapacitySchedulerConfiguration(configuration); @@ -372,12 +376,27 @@ private void startSchedulerThreads() { public void serviceInit(Configuration conf) throws Exception { Configuration configuration = new Configuration(conf); super.serviceInit(conf); + useStore = conf.getBoolean(YarnConfiguration.USE_MUTABLE_QUEUE_CONFIG, false); + if (useStore) { + // For now, create the MutableConfigurationManager in CapacityScheduler. When supporting other + // mutable configurations, should create this in ResourceManager. + if (!(rmContext.getConfigurationProvider() instanceof MutableConfigurationManager)) { + throw new YarnRuntimeException("Store is enabled, but configuration provider is not MutableConfigurationManager."); + } + MutableConfigurationManager mutableConfMgr = new MutableConfigurationManager(); + mutableConfMgr.init(conf); + mutableConfMgr.setRMContext(rmContext); + rmContext.setMutableConfigurationManager(mutableConfMgr); + } initScheduler(configuration); } @Override public void serviceStart() throws Exception { startSchedulerThreads(); + if (useStore) { + rmContext.getMutableConfigurationManager().recover(); + } super.serviceStart(); } @@ -1869,15 +1888,21 @@ public boolean isSystemAppsLimitReached() { private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration( Configuration configuration) throws IOException { try { - InputStream CSInputStream = - this.rmContext.getConfigurationProvider() - .getConfigurationInputStream(configuration, - YarnConfiguration.CS_CONFIGURATION_FILE); - if (CSInputStream != null) { - configuration.addResource(CSInputStream); + if (!useStore) { + InputStream CSInputStream = + this.rmContext.getConfigurationProvider() + .getConfigurationInputStream(configuration, + YarnConfiguration.CS_CONFIGURATION_FILE); + if (CSInputStream != null) { + configuration.addResource(CSInputStream); + return new CapacitySchedulerConfiguration(configuration, false); + } + return new CapacitySchedulerConfiguration(configuration, true); + } else { + Configuration csConf = rmContext.getMutableConfigurationManager().getConfigurationObject(); + configuration.addResource(csConf); return new CapacitySchedulerConfiguration(configuration, false); } - return new CapacitySchedulerConfiguration(configuration, true); } catch (Exception e) { throw new IOException(e); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index bd0602b..6481242 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -95,6 +95,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SchedulerConfigurationMutationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; @@ -113,12 +114,15 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueConfigurationMutation; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SchedulerConfigurationMutation; +import org.apache.hadoop.yarn.api.records.SchedulerOperationType; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -2614,4 +2618,96 @@ public Void run() throws IOException, YarnException { app.getApplicationTimeouts().get(appTimeout.getTimeoutType())); return Response.status(Status.OK).entity(timeout).build(); } + + @PUT + @Path("/conf/scheduler/mutate") + @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, + MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) + @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response mutateSchedulerConfiguration(SchedulerConfigurationMutationInfo mutationInfo, + @Context HttpServletRequest hsr) + throws AuthorizationException, InterruptedException { + init(); + + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); + if (callerUGI == null) { + throw new AuthorizationException( + "Unable to obtain user name, user not authenticated"); + } + + if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) { + return Response.status(Status.FORBIDDEN) + .entity("The default static user cannot carry out this operation.") + .build(); + } + + SchedulerConfigurationMutationRequest request = createConfigurationMutationRequest(mutationInfo); + try { + callerUGI.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws IOException, YarnException { + rm.getClientRMService().mutateSchedulerConfiguration(request); + return null; + } + }); + } catch (IOException e) { + throw new BadRequestException(e); + } + return Response.status(Status.OK).build(); + } + + private SchedulerConfigurationMutationRequest createConfigurationMutationRequest(SchedulerConfigurationMutationInfo info) { + List queueMutations = new ArrayList<>(); + for (AddQueueInfo addInfo : info.getAddQueueInfo()) { + getQueueConfigurationMutation(addInfo, queueMutations, ""); + } + for (RemoveQueueInfo removeInfo : info.getRemoveQueueInfo()) { + getQueueConfigurationMutation(removeInfo, queueMutations, ""); + } + for (UpdateQueueInfo updateInfo : info.getUpdateQueueInfo()) { + getQueueConfigurationMutation(updateInfo, queueMutations, ""); + } + SchedulerConfigurationMutation schedMutation = SchedulerConfigurationMutation.newInstance(queueMutations); + SchedulerConfigurationMutationRequest req = SchedulerConfigurationMutationRequest.newInstance(schedMutation); + return req; + } + + private void getQueueConfigurationMutation(AddQueueInfo addInfo, + List queueMutations, String parentQueue) { + String queue = parentQueue.isEmpty() ? addInfo.getQueue() : (parentQueue + "." + addInfo.getQueue()); + QueueConfigurationMutation addMutation = QueueConfigurationMutation.newInstance(queue, + SchedulerOperationType.ADD, addInfo.getParams()); + queueMutations.add(addMutation); + if (addInfo.getAddQueueInfo() != null) { + for (AddQueueInfo childAddInfo : addInfo.getAddQueueInfo()) { + getQueueConfigurationMutation(childAddInfo, queueMutations, queue); + } + } + } + + private void getQueueConfigurationMutation(RemoveQueueInfo removeInfo, + List queueMutations, String parentQueue) { + String queue = parentQueue.isEmpty() ? removeInfo.getQueue() : (parentQueue + "." + removeInfo.getQueue()); + QueueConfigurationMutation removeMutation = QueueConfigurationMutation.newInstance(queue, + SchedulerOperationType.REMOVE, Collections.emptyMap()); + queueMutations.add(removeMutation); + if (removeInfo.getRemoveQueueInfo() != null) { + for (RemoveQueueInfo childRemoveInfo : removeInfo.getRemoveQueueInfo()) { + getQueueConfigurationMutation(childRemoveInfo, queueMutations, queue); + } + } + } + + private void getQueueConfigurationMutation(UpdateQueueInfo updateInfo, + List queueMutations, String parentQueue) { + String queue = parentQueue.isEmpty() ? updateInfo.getQueueName() : (parentQueue + "." + updateInfo.getQueueName()); + QueueConfigurationMutation updateMutation = QueueConfigurationMutation.newInstance(queue, + SchedulerOperationType.UPDATE, updateInfo.getParams()); + queueMutations.add(updateMutation); + if (updateInfo.getUpdateNestedQueues() != null) { + for (UpdateQueueInfo childUpdateInfo : updateInfo.getUpdateNestedQueues()) { + getQueueConfigurationMutation(childUpdateInfo, queueMutations, queue); + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AddQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AddQueueInfo.java new file mode 100644 index 0000000..5d4caeb --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AddQueueInfo.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "add") +@XmlAccessorType(XmlAccessType.FIELD) +public class AddQueueInfo { + + @XmlElement(name = "name") + private String queue; + + private HashMap params = new HashMap<>(); + + @XmlElement(name = "add") + private ArrayList addQueueInfo = new ArrayList<>(); + + public AddQueueInfo() { } + + public String getQueue() { + return this.queue; + } + + public HashMap getParams() { + return params; + } + + public ArrayList getAddQueueInfo() { + return addQueueInfo; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/RemoveQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/RemoveQueueInfo.java new file mode 100644 index 0000000..b25be68 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/RemoveQueueInfo.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; + +@XmlRootElement(name = "remove") +@XmlAccessorType(XmlAccessType.FIELD) +public class RemoveQueueInfo { + + @XmlElement(name = "name") + private String queue; + + @XmlElement(name = "remove") + private ArrayList removeQueueInfo = new ArrayList<>(); + + public RemoveQueueInfo() { + } + + public String getQueue() { + return this.queue; + } + + public ArrayList getRemoveQueueInfo() { + return removeQueueInfo; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/SchedulerConfigurationMutationInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/SchedulerConfigurationMutationInfo.java new file mode 100644 index 0000000..dd8a9d1 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/SchedulerConfigurationMutationInfo.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import java.util.ArrayList; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "schedConf") +@XmlAccessorType(XmlAccessType.FIELD) +public class SchedulerConfigurationMutationInfo { + + @XmlElement(name = "add") + private ArrayList addQueueInfo = new ArrayList<>(); + + @XmlElement(name = "remove") + private ArrayList removeQueueInfo = new ArrayList<>(); + + @XmlElement(name = "update") + private ArrayList updateQueueInfo = new ArrayList<>(); + + public SchedulerConfigurationMutationInfo() { + // JAXB needs this + } + + public ArrayList getAddQueueInfo() { + return addQueueInfo; + } + + public ArrayList getRemoveQueueInfo() { + return removeQueueInfo; + } + + public ArrayList getUpdateQueueInfo() { + return updateQueueInfo; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/UpdateQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/UpdateQueueInfo.java new file mode 100644 index 0000000..1f6368a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/UpdateQueueInfo.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "update") +@XmlAccessorType(XmlAccessType.FIELD) +public class UpdateQueueInfo { + + @XmlElement(name = "name") + private String queueName; + + private HashMap params = new HashMap<>(); + + @XmlElement(name = "update") + private ArrayList updateNestedQueues = new ArrayList<>(); + + public UpdateQueueInfo() { + } + + public String getQueueName() { + return this.queueName; + } + + public HashMap getParams() { + return params; + } + + public ArrayList getUpdateNestedQueues() { + return updateNestedQueues; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/initialize.sql hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/initialize.sql new file mode 100644 index 0000000..96c71e2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/initialize.sql @@ -0,0 +1,37 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- This SQL script defines the table schema for the embedded database storing +-- queue configurations. The classes under package +-- org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.param +-- will access this embedded database to retrieve and update the queue configurations. + +create table sched_conf ( +k varchar(250), +v varchar(32672), +primary key (k)); + +create table logs ( +k varchar(250), +v varchar(32672), +ugi varchar(100), +id integer, +isDelete boolean +); + +create table id ( +id integer +);