diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/CsiConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/CsiConstants.java new file mode 100644 index 00000000000..ae290d28f28 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/CsiConstants.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.volume.csi; + +/** + * CSI constants. + */ +public final class CsiConstants { + + private CsiConstants() { + // Hide the constructor for this constant class. + } + + public static final String CSI_VOLUME_NAME = "volume.name"; + public static final String CSI_VOLUME_ID = "volume.id"; + public static final String CSI_VOLUME_CAPABILITY = "volume.capability"; + public static final String CSI_DRIVER_NAME = "driver.name"; + public static final String CSI_VOLUME_MOUNT = "volume.mount"; + public static final String CSI_VOLUME_DRIVER_NAME = "driver.name"; + public static final String CSI_VOLUME_ACCESS_MODE = "volume.accessMode"; + + public static final String CSI_VOLUME_RESOURCE_TAG = "system:csi-volume"; + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/VolumeCapability.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/VolumeCapability.java new file mode 100644 index 00000000000..92ec6d702b6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/VolumeCapability.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.volume.csi; + +import com.google.common.base.Strings; +import org.apache.hadoop.yarn.volume.csi.exception.InvalidVolumeException; + +/** + * Volume capability that specified in a volume resource request, + * a CSI driver validates a volume according its capability. + */ +public final class VolumeCapability { + + private final long minCapacity; + private final long maxCapacity; + private final String unit; + + private VolumeCapability(long minCapacity, long maxCapacity, String unit) { + this.minCapacity = minCapacity; + this.maxCapacity = maxCapacity; + this.unit = unit; + } + + public long getMinCapacity() { + return minCapacity; + } + + public long getMaxCapacity() { + return maxCapacity; + } + + public String getUnit() { + return unit; + } + + @Override + public String toString() { + return "MinCapability: " + minCapacity + unit + + ", MaxCapability: " + maxCapacity + unit; + } + + public static VolumeCapabilityBuilder newBuilder() { + return new VolumeCapabilityBuilder(); + } + + /** + * The builder used to build a VolumeCapability instance. + */ + public static class VolumeCapabilityBuilder { + // An invalid default value implies this value must be set + private long minCap = -1L; + private long maxCap = Long.MAX_VALUE; + private String unit; + + public VolumeCapabilityBuilder minCapacity(long minCapacity) { + this.minCap = minCapacity; + return this; + } + + public VolumeCapabilityBuilder maxCapacity(long maxCapacity) { + this.maxCap = maxCapacity; + return this; + } + + public VolumeCapabilityBuilder unit(String capacityUnit) { + this.unit = capacityUnit; + return this; + } + + public VolumeCapability build() throws InvalidVolumeException { + VolumeCapability capability = new VolumeCapability(minCap, maxCap, unit); + validateCapability(capability); + return capability; + } + + private void validateCapability(VolumeCapability capability) + throws InvalidVolumeException { + if (capability.getMinCapacity() < 0) { + throw new InvalidVolumeException("Invalid volume capability range," + + " minimal capability must not be less than 0. Capability: " + + capability.toString()); + } + if (Strings.isNullOrEmpty(capability.getUnit())) { + throw new InvalidVolumeException("Invalid volume capability," + + " capability unit is missing. Capability: " + + capability.toString()); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/VolumeId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/VolumeId.java new file mode 100644 index 00000000000..d62fc8c2f0b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/VolumeId.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.volume.csi; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.util.StringUtils; + +/** + * Unique ID for a volume. This may or may not come from a storage system, + * YARN depends on this ID to recognized volumes and manage their states. + */ +public class VolumeId { + + private final String volumeId; + + public VolumeId(String volumeId) { + this.volumeId = volumeId; + } + + public String getId() { + return this.volumeId; + } + + @Override + public String toString() { + return this.volumeId; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof VolumeId)) { + return false; + } + return StringUtils.equalsIgnoreCase(volumeId, + ((VolumeId) obj).getId()); + } + + @Override + public int hashCode() { + HashCodeBuilder hc = new HashCodeBuilder(); + hc.append(volumeId); + return hc.toHashCode(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/VolumeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/VolumeSpec.java new file mode 100644 index 00000000000..5d1d58bc24a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/VolumeSpec.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.volume.csi; + +import com.google.common.base.Strings; +import com.google.gson.JsonObject; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.volume.csi.exception.InvalidVolumeException; + +import java.util.ArrayList; +import java.util.List; + +/** + * VolumeSpec defines all valid info for a CSI compatible volume. + */ +public class VolumeSpec { + + private VolumeId volumeId; + private String volumeName; + private VolumeCapability volumeCapability; + private String driverName; + private String mountPoint; + + private void setVolumeId(VolumeId volumeId) { + this.volumeId = volumeId; + } + + private void setVolumeName(String volumeName) { + this.volumeName = volumeName; + } + + private void setVolumeCapability(VolumeCapability capability) { + this.volumeCapability = capability; + } + + private void setDriverName(String driverName) { + this.driverName = driverName; + } + + private void setMountPoint(String mountPoint) { + this.mountPoint = mountPoint; + } + + public boolean isPrevisionedVolume() { + return this.volumeId != null; + } + + public VolumeId getVolumeId() { + return volumeId; + } + + public String getVolumeName() { + return volumeName; + } + + public VolumeCapability getVolumeCapability() { + return volumeCapability; + } + + public String getDriverName() { + return driverName; + } + + public String getMountPoint() { + return mountPoint; + } + + public static VolumeSpecBuilder newBuilder() { + return new VolumeSpecBuilder(); + } + + public static List fromResource(ResourceInformation resourceInfo) + throws InvalidVolumeException { + List volumeSpecs = new ArrayList<>(); + if (resourceInfo != null) { + if (resourceInfo.getTags() != null && resourceInfo.getTags() + .contains(CsiConstants.CSI_VOLUME_RESOURCE_TAG)) { + VolumeSpecBuilder builder = VolumeSpec.newBuilder(); + // Volume ID + if (resourceInfo.getAttributes() + .containsKey(CsiConstants.CSI_VOLUME_ID)) { + String id = resourceInfo.getAttributes() + .get(CsiConstants.CSI_VOLUME_ID); + builder.volumeId(new VolumeId(id)); + } + // Volume name + if (resourceInfo.getAttributes() + .containsKey(CsiConstants.CSI_VOLUME_NAME)) { + builder.volumeName(resourceInfo.getAttributes() + .get(CsiConstants.CSI_VOLUME_NAME)); + } + // CSI driver name + if (resourceInfo.getAttributes() + .containsKey(CsiConstants.CSI_VOLUME_DRIVER_NAME)) { + builder.driverName(resourceInfo.getAttributes() + .get(CsiConstants.CSI_VOLUME_DRIVER_NAME)); + } + // Mount path + if (resourceInfo.getAttributes() + .containsKey(CsiConstants.CSI_VOLUME_MOUNT)) { + builder.mountPoint(resourceInfo.getAttributes() + .get(CsiConstants.CSI_VOLUME_MOUNT)); + } + // Volume capability + VolumeCapability volumeCapability = + VolumeCapability.newBuilder().minCapacity(resourceInfo.getValue()) + .unit(resourceInfo.getUnits()) + .build(); + builder.capability(volumeCapability); + volumeSpecs.add(builder.build()); + } + } + return volumeSpecs; + } + + @Override + public String toString() { + JsonObject json = new JsonObject(); + if (!Strings.isNullOrEmpty(volumeName)) { + json.addProperty(CsiConstants.CSI_VOLUME_NAME, volumeName); + } + if (volumeId != null) { + json.addProperty(CsiConstants.CSI_VOLUME_ID, volumeId.toString()); + } + if (volumeCapability != null) { + json.addProperty(CsiConstants.CSI_VOLUME_CAPABILITY, + volumeCapability.toString()); + } + if (!Strings.isNullOrEmpty(driverName)) { + json.addProperty(CsiConstants.CSI_DRIVER_NAME, driverName); + } + if (!Strings.isNullOrEmpty(mountPoint)) { + json.addProperty(CsiConstants.CSI_VOLUME_MOUNT, mountPoint); + } + return json.toString(); + } + + /** + * The builder used to build a VolumeSpec instance. + */ + public static class VolumeSpecBuilder { + // @CreateVolumeRequest + // The suggested name for the storage space. + private VolumeId volumeId; + private String volumeName; + private VolumeCapability volumeCapability; + private String driverName; + private String mountPoint; + + public VolumeSpecBuilder volumeId(VolumeId volumeId) { + this.volumeId = volumeId; + return this; + } + + public VolumeSpecBuilder volumeName(String name) { + this.volumeName = name; + return this; + } + + public VolumeSpecBuilder driverName(String driverName) { + this.driverName = driverName; + return this; + } + + public VolumeSpecBuilder mountPoint(String mountPoint) { + this.mountPoint = mountPoint; + return this; + } + + public VolumeSpecBuilder capability(VolumeCapability capability) { + this.volumeCapability = capability; + return this; + } + + public VolumeSpec build() throws InvalidVolumeException { + VolumeSpec spec = new VolumeSpec(); + spec.setVolumeId(volumeId); + spec.setVolumeName(volumeName); + spec.setVolumeCapability(volumeCapability); + spec.setDriverName(driverName); + spec.setMountPoint(mountPoint); + validate(spec); + return spec; + } + + private void validate(VolumeSpec spec) throws InvalidVolumeException { + // Volume name OR Volume ID must be set + if (Strings.isNullOrEmpty(spec.getVolumeName()) + && spec.getVolumeId() == null) { + throw new InvalidVolumeException("Invalid volume, both volume name" + + " and ID are missing from the spec. Volume spec: " + + spec.toString()); + } + // Volume capability must be set + if (spec.getVolumeCapability() == null) { + throw new InvalidVolumeException("Invalid volume, volume capability" + + " is missing. Volume spec: " + spec.toString()); + } + // CSI driver name must be set + if (Strings.isNullOrEmpty(spec.getDriverName())) { + throw new InvalidVolumeException("Invalid volume, the csi-driver name" + + " is missing. Volume spec: " + spec.toString()); + } + // Mount point must be set + if (Strings.isNullOrEmpty(spec.getMountPoint())) { + throw new InvalidVolumeException("Invalid volume, the mount point" + + " is missing. Volume spec: " + spec.toString()); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/exception/InvalidVolumeException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/exception/InvalidVolumeException.java new file mode 100644 index 00000000000..ad69c289f65 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/exception/InvalidVolumeException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.volume.csi.exception; + +/** + * This exception is thrown when a volume is found not valid. + */ +public class InvalidVolumeException extends VolumeException { + + public InvalidVolumeException(String message) { + super(message); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/exception/InvalidVolumeStateException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/exception/InvalidVolumeStateException.java new file mode 100644 index 00000000000..984177872a8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/exception/InvalidVolumeStateException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.volume.csi.exception; + +/** + * Invalid volume state exception. + */ +public class InvalidVolumeStateException extends VolumeException { + + public InvalidVolumeStateException(String message) { + super(message); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/exception/VolumeException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/exception/VolumeException.java new file mode 100644 index 00000000000..bdb5f96e967 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/exception/VolumeException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.volume.csi.exception; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Base class for all volume related exceptions. + */ +public class VolumeException extends YarnException { + + public VolumeException(String message) { + super(message); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/exception/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/exception/package-info.java new file mode 100644 index 00000000000..707802d9aaf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/exception/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains volume related exception classes. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.volume.csi.exception; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/package-info.java new file mode 100644 index 00000000000..0ead6ce27d5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/volume/csi/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains common classes for CSI volume. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.volume.csi; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 35620781677..f829a4cbbc4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -121,6 +122,7 @@ private MultiNodeSortingManager multiNodeSortingManager; private ProxyCAManager proxyCAManager; + private VolumeManager volumeManager; public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); @@ -569,4 +571,16 @@ public ProxyCAManager getProxyCAManager() { public void setProxyCAManager(ProxyCAManager proxyCAManager) { this.proxyCAManager = proxyCAManager; } + + @Private + @Unstable + public VolumeManager getVolumeManager() { + return this.volumeManager; + } + + @Private + @Unstable + public void setVolumeManager(VolumeManager volumeManager) { + this.volumeManager = volumeManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index f06befe2eae..4e9846c731b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager; /** * Context of the ResourceManager. @@ -193,4 +194,8 @@ void setMultiNodeSortingManager( ProxyCAManager getProxyCAManager(); void setProxyCAManager(ProxyCAManager proxyCAManager); + + VolumeManager getVolumeManager(); + + void setVolumeManager(VolumeManager volumeManager); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 48f74d368e9..ab71134c93f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.util.Clock; @@ -648,6 +649,17 @@ public ProxyCAManager getProxyCAManager() { public void setProxyCAManager(ProxyCAManager proxyCAManager) { this.activeServiceContext.setProxyCAManager(proxyCAManager); } + + @Override + public VolumeManager getVolumeManager() { + return activeServiceContext.getVolumeManager(); + } + + @Override + public void setVolumeManager(VolumeManager volumeManager) { + this.activeServiceContext.setVolumeManager(volumeManager); + } + // Note: Read java doc before adding any services over here. @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index a89069a5247..8840d5786f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -109,6 +109,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManagerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -136,6 +139,7 @@ import java.security.PrivilegedExceptionAction; import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -845,6 +849,16 @@ protected void serviceInit(Configuration configuration) throws Exception { addIfService(systemServiceManager); } + // Add volume manager to RM context when it is necessary + String[] amsProcessorList = conf.getStrings( + YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS); + if (amsProcessorList != null && Arrays.stream(amsProcessorList).anyMatch( + s -> VolumeAMSProcessor.class.getName().equals(s))) { + VolumeManager volumeManager = new VolumeManagerImpl(); + rmContext.setVolumeManager(volumeManager); + addIfService(volumeManager); + } + super.serviceInit(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java new file mode 100644 index 00000000000..d8160df63db --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.yarn.volume.csi.exception.VolumeException; +import org.apache.hadoop.yarn.volume.csi.VolumeId; +import org.apache.hadoop.yarn.volume.csi.VolumeCapability; + +import java.util.Map; + +/** + * Main interface for volume manager that manages all volumes. + * Volume manager talks to a CSI controller plugin to handle the + * volume operations before it is available to be published on + * any node manager. + */ +public interface VolumeManager { + + /** + * Check if a pre-provisioned volume has capabilities that request wants. + * A volume is valid only when all capability criteria are satisfied. + *
+   * {@code
+   * // CSI spec
+   * rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
+   *   returns (ValidateVolumeCapabilitiesResponse) {}
+   * }
+   * 
+ * @param volumeId volume ID + * @throws VolumeException + */ + void validateVolume(VolumeId volumeId, VolumeCapability capability) + throws VolumeException; + + /** + * Publish a volume to one or more nodes from the CSI controller plugin. + * + *
+   * {@code
+   * // CSI spec
+   * rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
+   *   returns (ControllerPublishVolumeResponse) {}
+   * }
+   * 
+ * + * @param volumeId + * @param capability + * @param readOnly + * @param secrets + * @param volumeAttributes + * @throws VolumeException + */ + void controllerPublishVolume(VolumeId volumeId, VolumeCapability capability, + boolean readOnly, Map secrets, + Map volumeAttributes) throws VolumeException; + + /** + * @return all known volumes and their states. + */ + VolumeStates getVolumeStates(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java new file mode 100644 index 00000000000..2ace8242220 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.volume.csi.exception.InvalidVolumeStateException; +import org.apache.hadoop.yarn.volume.csi.exception.VolumeException; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.ValidateVolumeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeEventType; +import org.apache.hadoop.yarn.volume.csi.VolumeId; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState; +import org.apache.hadoop.yarn.volume.csi.VolumeCapability; + +import java.util.Map; +import java.util.concurrent.ExecutorService; + +/** + * A service manages all volumes. + */ +public class VolumeManagerImpl extends AbstractService + implements VolumeManager{ + + private static final Log LOG = LogFactory.getLog(VolumeManagerImpl.class); + + + private final VolumeStates volumeStates; + private final ExecutorService executor; + private final AsyncDispatcher dispatcher; + + public VolumeManagerImpl() { + super(VolumeManagerImpl.class.getName()); + this.volumeStates = new VolumeStates(); + this.executor = HadoopExecutors.newSingleThreadExecutor(); + this.dispatcher = new AsyncDispatcher("CSI Volume Manager Dispatcher"); + this.dispatcher.register(VolumeEventType.class, + new VolumeEventDispatcher()); + } + + private class VolumeEventDispatcher implements EventHandler { + @Override + public void handle(VolumeEvent event) { + Volume v = volumeStates.getVolume(event.getVolumeId()); + if (v != null) { + v.handle(event); + } else { + LOG.warn("Event " + event + + " cannot be handled because the given volume " + + event.getVolumeId() + " is not found."); + } + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.dispatcher.init(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + this.dispatcher.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + this.dispatcher.stop(); + executor.shutdown(); + super.serviceStop(); + } + + + @Override + public VolumeStates getVolumeStates() { + return this.volumeStates; + } + + @Override + public void validateVolume(VolumeId volumeId, VolumeCapability capability) + throws VolumeException { + Volume volume; + // Check if volume is already managed at given state + if (volumeStates.hasVolume(volumeId)) { + volume = volumeStates.getVolume(volumeId); + if (volume.getVolumeState() != VolumeState.PUBLISHED) { + // volume exists but the state is not expected + throw new InvalidVolumeStateException( + "Invalid volume state, volumeId: " + volumeId + + ", expected state: " + VolumeState.PUBLISHED + + ", actual state: " + volume.getVolumeState()); + } + } else { + volume = new VolumeImpl(volumeId); + } + // Validate the volume with the driver + LOG.info("Validating volume " + volumeId); + volumeStates.addVolume(volume); + dispatcher.getEventHandler() + .handle(new ValidateVolumeEvent(volume, capability)); + LOG.info("Validating volume request has been dispatched"); + } + + @Override + public void controllerPublishVolume(VolumeId volumeId, + VolumeCapability capability, boolean readOnly, + Map secrets, Map volumeAttributes) + throws VolumeException { + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeStates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeStates.java new file mode 100644 index 00000000000..9d85f10c93b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeStates.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.apache.hadoop.yarn.volume.csi.VolumeId; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Volume manager states, including all managed volumes and their states. + */ +public class VolumeStates { + + private final Map volumeStates; + + public VolumeStates() { + this.volumeStates = new ConcurrentHashMap<>(); + } + + protected Volume getVolume(VolumeId volumeId) { + return volumeStates.get(volumeId); + } + + protected boolean hasVolume(VolumeId volumeId) { + return volumeStates.containsKey(volumeId); + } + + protected void addVolume(Volume volume) { + volumeStates.put(volume.getVolumeId(), volume); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/ValidateVolumeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/ValidateVolumeEvent.java new file mode 100644 index 00000000000..c6e0cb95355 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/ValidateVolumeEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; + +import org.apache.hadoop.yarn.volume.csi.VolumeCapability; + +/** + * Validate volume capability with the CSI driver. + */ +public class ValidateVolumeEvent extends VolumeEvent { + + private VolumeCapability capability; + + public ValidateVolumeEvent(Volume volume, VolumeCapability volumeCapability) { + super(volume, VolumeEventType.VALIDATE_VOLUME_EVENT); + this.capability = volumeCapability; + } + + public VolumeCapability getVolumeCapability() { + return this.capability; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java new file mode 100644 index 00000000000..984fbfb7753 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; + +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.volume.csi.VolumeId; + +/** + * Major volume interface at RM's view, it maintains the volume states and + * state transition according to the CSI volume lifecycle. + */ +public interface Volume extends EventHandler { + + VolumeState getVolumeState(); + + VolumeId getVolumeId(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeEvent.java new file mode 100644 index 00000000000..63e7ba7b4cb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeEvent.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.volume.csi.VolumeId; + +/** + * Base volume event class that used to trigger volume state transitions. + */ +public class VolumeEvent extends AbstractEvent { + + private Volume volume; + + public VolumeEvent(Volume volume, VolumeEventType volumeEventType) { + super(volumeEventType, System.currentTimeMillis()); + this.volume = volume; + } + + public Volume getVolume() { + return this.volume; + } + + public VolumeId getVolumeId() { + return this.volume.getVolumeId(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeEventType.java new file mode 100644 index 00000000000..0cd0ce5cd14 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeEventType.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; + +/** + * Volume events. + */ +public enum VolumeEventType { + VALIDATE_VOLUME_EVENT, + CREATE_VOLUME_EVENT, + CONTROLLER_PUBLISH_VOLUME_EVENT, + NODE_STAGE_VOLUME_EVENT, + NODE_PUBLISH_VOLUME_EVENT, + NODE_UNPUBLISH_VOLUME_EVENT, + NODE_UNSTAGE_VOLUME_EVENT, + CONTROLLER_UNPUBLISH_VOLUME_EVENT, + DELETE_VOLUME +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java new file mode 100644 index 00000000000..04e20b00640 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.state.InvalidStateTransitionException; +import org.apache.hadoop.yarn.state.MultipleArcTransition; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.volume.csi.VolumeId; + +import java.util.EnumSet; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This class maintains the volume states and state transition + * according to the CSI volume lifecycle. Volume states are stored in + * {@link org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeStates} + * class. + */ +public class VolumeImpl implements Volume { + + private static final Log LOG = LogFactory.getLog(VolumeImpl.class); + + private final Lock readLock; + private final Lock writeLock; + private final StateMachine stateMachine; + + private final VolumeId volumeId; + + public VolumeImpl(VolumeId volumeId) { + ReadWriteLock lock = new ReentrantReadWriteLock(); + this.writeLock = lock.writeLock(); + this.readLock = lock.readLock(); + this.volumeId = volumeId; + this.stateMachine = createVolumeStateFactory() + .make(this, VolumeState.NEW, new VolumeStateListener()); + } + + private StateMachineFactory createVolumeStateFactory() { + return new StateMachineFactory(VolumeState.NEW) + .addTransition( + VolumeState.NEW, + EnumSet.of(VolumeState.VALIDATED, VolumeState.UNAVAILABLE), + VolumeEventType.VALIDATE_VOLUME_EVENT, + new ValidateVolumeTransition()) + .installTopology(); + } + + @Override + public VolumeState getVolumeState() { + try { + readLock.lock(); + return stateMachine.getCurrentState(); + } finally { + readLock.unlock(); + } + } + + @Override + public VolumeId getVolumeId() { + try { + readLock.lock(); + return this.volumeId; + } finally { + readLock.unlock(); + } + } + + private static class ValidateVolumeTransition + implements MultipleArcTransition { + @Override + public VolumeState transition(VolumeImpl volume, + VolumeEvent volumeEvent) { + return VolumeState.VALIDATED; + } + } + + @Override + public void handle(VolumeEvent event) { + try { + this.writeLock.lock(); + VolumeId volumeId = event.getVolumeId(); + + if (volumeId == null) { + // This should not happen, safely ignore the event + LOG.warn("Unexpected volume event received, event type is " + + event.getType().name() + ", but the volumeId is null."); + return; + } + + LOG.info("Processing volume event, type=" + event.getType().name() + + ", volumeId=" + volumeId.toString()); + + VolumeState oldState = null; + VolumeState newState = null; + try { + oldState = stateMachine.getCurrentState(); + newState = stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitionException e) { + LOG.warn("Can't handle this event at current state: Current: [" + + oldState + "], eventType: [" + event.getType() + "]," + + " volumeId: [" + volumeId + "]", e); + } + + if (newState != null && oldState != newState) { + LOG.info("VolumeImpl " + volumeId + " transitioned from " + oldState + + " to " + newState); + } + }finally { + this.writeLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeState.java new file mode 100644 index 00000000000..57d567ddb16 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeState.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; + +/** + * Volume states + * Volume states are defined in the CSI spec, see more in volume lifecycle. + */ +public enum VolumeState { + NEW, + VALIDATED, + CREATED, + NODE_READY, + PUBLISHED, + DESTROYED, + UNAVAILABLE +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeStateListener.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeStateListener.java new file mode 100644 index 00000000000..6972bb1006a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeStateListener.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; + +import org.apache.hadoop.yarn.state.StateTransitionListener; + +/** + * Volume state listener, triggered when volume state transits. + * Currently no opt is added here. + */ +public class VolumeStateListener + implements StateTransitionListener { + + @Override + public void preTransition(VolumeImpl volume, VolumeState beforeState, + VolumeEvent eventToBeProcessed) { + // do nothing + } + + @Override + public void postTransition(VolumeImpl volume, + VolumeState beforeState, VolumeState afterState, + VolumeEvent processedEvent) { + // do nothing + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/package-info.java new file mode 100644 index 00000000000..a9dd3896643 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes to manage volume lifecycle. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/package-info.java new file mode 100644 index 00000000000..5d71617d52b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes to manage CSI volumes. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java new file mode 100644 index 00000000000..83e0115f38f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor; + +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager; +import org.apache.hadoop.yarn.volume.csi.VolumeSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * AMS processor that handles volume resource requests. + * This processor incorporates with + * {@link org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager} + * to manage the lifecycle of volumes. + * + */ +public class VolumeAMSProcessor implements ApplicationMasterServiceProcessor { + + private static final Logger LOG = LoggerFactory + .getLogger(VolumeAMSProcessor.class); + + private ApplicationMasterServiceProcessor nextAMSProcessor; + private VolumeManager volumeManager; + + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor nextProcessor) { + LOG.info("Initializing CSI volume processor"); + this.nextAMSProcessor = nextProcessor; + this.volumeManager = ((RMContext) amsContext).getVolumeManager(); + } + + @Override + public void registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) + throws IOException, YarnException { + this.nextAMSProcessor.registerApplicationMaster(applicationAttemptId, + request, response); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException { + LOG.info("Handling allocate in VolumeAMSProcessor"); + + // extract volume resource from the request + // now just support schedulingRequest and pre-provisioned volume + List requests = request.getSchedulingRequests(); + for (SchedulingRequest req : requests) { + Resource totalResource = req.getResourceSizing().getResources(); + List resourceList = totalResource + .getAllResourcesListCopy(); + for (ResourceInformation resourceInformation : resourceList) { + List volumes = VolumeSpec.fromResource(resourceInformation); + for (VolumeSpec vs : volumes) { + if (vs.isPrevisionedVolume()) { + LOG.info("Handling volume spec: " + vs.toString()); + volumeManager.validateVolume(vs.getVolumeId(), + vs.getVolumeCapability()); + } + } + } + } + + LOG.info("Route to next processor"); + nextAMSProcessor.allocate(appAttemptId, request, response); + } + + @Override + public void finishApplicationMaster( + ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + this.nextAMSProcessor.finishApplicationMaster(applicationAttemptId, + request, response); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java new file mode 100644 index 00000000000..788a4179a80 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains AMS processor class for volume handling. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java new file mode 100644 index 00000000000..3f9bc938f79 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes to manage storage volumes in YARN. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.volume; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapability.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapability.java new file mode 100644 index 00000000000..072f02a3c76 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapability.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.yarn.volume.csi.exception.InvalidVolumeException; +import org.apache.hadoop.yarn.volume.csi.VolumeCapability; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for volume capability. + */ +public class TestVolumeCapability { + + @Test(expected = InvalidVolumeException.class) + public void testInvalidMinCapability() throws InvalidVolumeException { + VolumeCapability.newBuilder() + .minCapacity(-1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + } + + @Test(expected = InvalidVolumeException.class) + public void testMissingMinCapability() throws InvalidVolumeException { + VolumeCapability.newBuilder() + .maxCapacity(5L) + .unit("Gi") + .build(); + } + + @Test(expected = InvalidVolumeException.class) + public void testMissingUnit() throws InvalidVolumeException { + VolumeCapability.newBuilder() + .minCapacity(0L) + .maxCapacity(5L) + .build(); + } + + @Test + public void testGetVolumeCapability() throws InvalidVolumeException { + VolumeCapability vc = VolumeCapability.newBuilder() + .minCapacity(0L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + Assert.assertEquals(0L, vc.getMinCapacity()); + Assert.assertEquals(5L, vc.getMaxCapacity()); + Assert.assertEquals("Gi", vc.getUnit()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java new file mode 100644 index 00000000000..c1b5de3ed5b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.yarn.volume.csi.exception.InvalidVolumeException; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.ValidateVolumeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl; +import org.apache.hadoop.yarn.volume.csi.VolumeId; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState; +import org.apache.hadoop.yarn.volume.csi.VolumeCapability; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for volume lifecycle management. + */ +public class TestVolumeLifecycle { + + @Test + public void testVolumeValidation() throws InvalidVolumeException { + VolumeImpl v = new VolumeImpl(new VolumeId("vol_1234")); + Assert.assertEquals(VolumeState.NEW, v.getVolumeState()); + VolumeCapability volumeCap = + VolumeCapability.newBuilder() + .minCapacity(0L) + .maxCapacity(5L) + .unit("Gi") + .build(); + v.handle(new ValidateVolumeEvent(v, volumeCap)); + Assert.assertEquals(VolumeState.VALIDATED, v.getVolumeState()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeManagerImpl.java new file mode 100644 index 00000000000..f6b0e79d980 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeManagerImpl.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.volume.csi.exception.VolumeException; +import org.apache.hadoop.yarn.volume.csi.VolumeId; +import org.apache.hadoop.yarn.volume.csi.VolumeCapability; +import org.junit.Test; + +/** + * Test cases for volume manager service. + */ +public class TestVolumeManagerImpl { + + @Test + public void testVolumeManager() throws VolumeException { + VolumeManagerImpl volumeManager = new VolumeManagerImpl(); + volumeManager.init(new Configuration()); + volumeManager.start(); + VolumeCapability volumeCap = + VolumeCapability.newBuilder() + .minCapacity(0L) + .maxCapacity(5L) + .unit("Gi") + .build(); + volumeManager.validateVolume(new VolumeId("some-volume-id"), volumeCap); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java new file mode 100644 index 00000000000..0b3320ffb19 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor; +import org.apache.hadoop.yarn.volume.csi.CsiConstants; +import org.apache.hadoop.yarn.volume.csi.VolumeId; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Arrays; + +/** + * Test cases for volume processor. + */ +public class TestVolumeProcessor { + + private static final int GB = 1024; + private YarnConfiguration conf; + private RMNodeLabelsManager mgr; + private MockRM rm; + private MockNM[] mockNMS; + private RMNode[] rmNodes; + private static final int NUM_OF_NMS = 4; + // resource-types.xml will be created under target/test-classes/ dir, + // it must be deleted after the test is done, to avoid it from reading + // by other UT classes. + private File resourceTypesFile = null; + + private static final String VOLUME_RESOURCE_NAME = "yarn.io/csi-volume"; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + resourceTypesFile = new File(conf.getClassLoader() + .getResource(".").getPath(), "resource-types.xml"); + writeTmpResourceTypesFile(resourceTypesFile); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + conf.set("yarn.scheduler.capacity.resource-calculator", + "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"); + conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS, + VolumeAMSProcessor.class.getName()); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + rm = new MockRM(conf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + mockNMS = new MockNM[NUM_OF_NMS]; + rmNodes = new RMNode[NUM_OF_NMS]; + for (int i = 0; i < 4; i++) { + mockNMS[i] = rm.registerNode("192.168.0." + i + ":1234", 10 * GB); + rmNodes[i] = rm.getRMContext().getRMNodes().get(mockNMS[i].getNodeId()); + } + } + + @After + public void tearDown() { + if (resourceTypesFile != null && resourceTypesFile.exists()) { + resourceTypesFile.delete(); + } + } + + private void writeTmpResourceTypesFile(File tmpFile) throws IOException { + FileWriter fw = new FileWriter(tmpFile); + try { + Configuration yarnConf = new YarnConfiguration(); + yarnConf.set(YarnConfiguration.RESOURCE_TYPES, VOLUME_RESOURCE_NAME); + yarnConf.set("yarn.resource-types." + + VOLUME_RESOURCE_NAME + ".units", "Mi"); + yarnConf.writeXml(fw); + } finally { + fw.close(); + } + } + + @Test(timeout = 10000L) + public void testPreprovisionedVolumeValidation() throws Exception { + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]); + Resource resource = Resource.newInstance(1024, 1); + ResourceInformation volumeResource = ResourceInformation + .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024, + ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE, + ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG), + ImmutableMap.of( + CsiConstants.CSI_VOLUME_ID, "test-vol-000001", + CsiConstants.CSI_VOLUME_DRIVER_NAME, "hostpath", + CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data" + ) + ); + resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource); + SchedulingRequest sc = SchedulingRequest + .newBuilder().allocationRequestId(0L) + .resourceSizing(ResourceSizing.newInstance(1, resource)) + .build(); + AllocateRequest ar = AllocateRequest.newBuilder() + .schedulingRequests(Arrays.asList(sc)) + .build(); + + am1.allocate(ar); + VolumeStates volumeStates = + rm.getRMContext().getVolumeManager().getVolumeStates(); + Assert.assertNotNull(volumeStates); + VolumeState volumeState = VolumeState.NEW; + while (volumeState != VolumeState.VALIDATED) { + Volume volume = volumeStates + .getVolume(new VolumeId("test-vol-000001")); + if (volume != null) { + volumeState = volume.getVolumeState(); + } + am1.doHeartbeat(); + mockNMS[0].nodeHeartbeat(true); + Thread.sleep(500); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeSpec.java new file mode 100644 index 00000000000..b1c2593d6a0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeSpec.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.apache.hadoop.yarn.volume.csi.CsiConstants; +import org.apache.hadoop.yarn.volume.csi.VolumeSpec; +import org.apache.hadoop.yarn.volume.csi.exception.InvalidVolumeException; +import org.apache.hadoop.yarn.volume.csi.VolumeId; +import org.apache.hadoop.yarn.volume.csi.VolumeCapability; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; + +/** + * Test cases for volume specification definition and parsing. + */ +public class TestVolumeSpec { + + @Test + public void testPreprovisionedVolume() throws InvalidVolumeException { + VolumeCapability cap = VolumeCapability.newBuilder() + .minCapacity(1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + // When volume id is given, volume name is optional + VolumeSpec spec = VolumeSpec.newBuilder() + .volumeId(new VolumeId("id-000001")) + .capability(cap) + .driverName("csi-demo-driver") + .mountPoint("/mnt/data") + .build(); + + Assert.assertEquals(new VolumeId("id-000001"), spec.getVolumeId()); + Assert.assertEquals(1L, spec.getVolumeCapability().getMinCapacity()); + Assert.assertEquals(5L, spec.getVolumeCapability().getMaxCapacity()); + Assert.assertEquals("Gi", spec.getVolumeCapability().getUnit()); + Assert.assertEquals("csi-demo-driver", spec.getDriverName()); + Assert.assertEquals("/mnt/data", spec.getMountPoint()); + Assert.assertNull(spec.getVolumeName()); + Assert.assertTrue(spec.isPrevisionedVolume()); + + // Test toString + JsonParser parser = new JsonParser(); + JsonElement element = parser.parse(spec.toString()); + JsonObject json = element.getAsJsonObject(); + Assert.assertNotNull(json); + Assert.assertNull(json.get(CsiConstants.CSI_VOLUME_NAME)); + Assert.assertEquals("id-000001", + json.get(CsiConstants.CSI_VOLUME_ID).getAsString()); + Assert.assertEquals("csi-demo-driver", + json.get(CsiConstants.CSI_DRIVER_NAME).getAsString()); + Assert.assertEquals("/mnt/data", + json.get(CsiConstants.CSI_VOLUME_MOUNT).getAsString()); + + } + + @Test + public void testDynamicalProvisionedVolume() throws InvalidVolumeException { + VolumeCapability cap = VolumeCapability.newBuilder() + .minCapacity(1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + // When volume name is given, volume id is optional + VolumeSpec spec = VolumeSpec.newBuilder() + .volumeName("volume-name") + .capability(cap) + .driverName("csi-demo-driver") + .mountPoint("/mnt/data") + .build(); + Assert.assertNotNull(spec); + + Assert.assertEquals("volume-name", spec.getVolumeName()); + Assert.assertEquals(1L, spec.getVolumeCapability().getMinCapacity()); + Assert.assertEquals(5L, spec.getVolumeCapability().getMaxCapacity()); + Assert.assertEquals("Gi", spec.getVolumeCapability().getUnit()); + Assert.assertEquals("csi-demo-driver", spec.getDriverName()); + Assert.assertEquals("/mnt/data", spec.getMountPoint()); + Assert.assertFalse(spec.isPrevisionedVolume()); + + // Test toString + JsonParser parser = new JsonParser(); + JsonElement element = parser.parse(spec.toString()); + JsonObject json = element.getAsJsonObject(); + Assert.assertNotNull(json); + Assert.assertNull(json.get(CsiConstants.CSI_VOLUME_ID)); + Assert.assertEquals("volume-name", + json.get(CsiConstants.CSI_VOLUME_NAME).getAsString()); + Assert.assertEquals("csi-demo-driver", + json.get(CsiConstants.CSI_DRIVER_NAME).getAsString()); + Assert.assertEquals("/mnt/data", + json.get(CsiConstants.CSI_VOLUME_MOUNT).getAsString()); + } + + @Test(expected = InvalidVolumeException.class) + public void testMissingMountpoint() throws InvalidVolumeException { + VolumeCapability cap = VolumeCapability.newBuilder() + .minCapacity(1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + VolumeSpec.newBuilder() + .volumeId(new VolumeId("id-000001")) + .capability(cap) + .driverName("csi-demo-driver") + .build(); + } + + + @Test(expected = InvalidVolumeException.class) + public void testMissingCsiDriverName() throws InvalidVolumeException { + VolumeCapability cap = VolumeCapability.newBuilder() + .minCapacity(1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + VolumeSpec.newBuilder() + .volumeId(new VolumeId("id-000001")) + .capability(cap) + .mountPoint("/mnt/data") + .build(); + } + + @Test(expected = InvalidVolumeException.class) + public void testMissingVolumeCapability() throws InvalidVolumeException { + VolumeSpec.newBuilder() + .volumeId(new VolumeId("id-000001")) + .driverName("csi-demo-driver") + .mountPoint("/mnt/data") + .build(); + } + + @Test + public void testVolumeId() { + VolumeId id1 = new VolumeId("test00001"); + VolumeId id11 = new VolumeId("test00001"); + VolumeId id2 = new VolumeId("test00002"); + + Assert.assertEquals(id1, id11); + Assert.assertEquals(id1.hashCode(), id11.hashCode()); + Assert.assertNotEquals(id1, id2); + + HashMap map = new HashMap<>(); + map.put(id1, "1"); + Assert.assertEquals(1, map.size()); + Assert.assertEquals("1", map.get(id11)); + map.put(id11, "2"); + Assert.assertEquals(1, map.size()); + Assert.assertEquals("2", map.get(id11)); + Assert.assertEquals("2", map.get(new VolumeId("test00001"))); + + Assert.assertNotEquals(id1, id2); + } +}