diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java index 0822163dc85..f0e5ebd29ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java @@ -19,6 +19,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -36,4 +40,10 @@ GetPluginInfoResponse getPluginInfo(GetPluginInfoRequest request) ValidateVolumeCapabilitiesResponse validateVolumeCapacity( ValidateVolumeCapabilitiesRequest request) throws YarnException, IOException; + + NodePublishVolumeResponse nodePublishVolume( + NodePublishVolumeRequest request) throws YarnException, IOException; + + NodeUnpublishVolumeResponse nodeUnpublishVolume( + NodeUnpublishVolumeRequest request) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeRequest.java new file mode 100644 index 00000000000..177f3675774 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeRequest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords; + +import com.google.gson.JsonObject; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability; +import org.apache.hadoop.yarn.util.Records; + +import java.util.Map; + +public abstract class NodePublishVolumeRequest { + + public static NodePublishVolumeRequest newInstance(String volumeId, + boolean readOnly, String targetPath, String stagingPath, + VolumeCapability capability, + Map publishContext, + Map secrets) { + NodePublishVolumeRequest request = + Records.newRecord(NodePublishVolumeRequest.class); + request.setVolumeId(volumeId); + request.setReadonly(readOnly); + request.setTargetPath(targetPath); + request.setStagingPath(stagingPath); + request.setVolumeCapability(capability); + request.setPublishContext(publishContext); + request.setSecrets(secrets); + return request; + } + + public abstract void setVolumeId(String volumeId); + + public abstract String getVolumeId(); + + public abstract void setReadonly(boolean readonly); + + public abstract boolean getReadOnly(); + + public abstract void setTargetPath(String targetPath); + + public abstract String getTargetPath(); + + public abstract void setStagingPath(String stagingPath); + + public abstract String getStagingPath(); + + public abstract void setVolumeCapability(VolumeCapability capability); + + public abstract VolumeCapability getVolumeCapability(); + + public abstract void setPublishContext(Map publishContext); + + public abstract Map getPublishContext(); + + public abstract void setSecrets(Map secrets); + + public abstract Map getSecrets(); + + public String toString() { + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("VolumeId", getVolumeId()); + jsonObject.addProperty("ReadOnly", getReadOnly()); + jsonObject.addProperty("TargetPath", getTargetPath()); + jsonObject.addProperty("StagingPath", getStagingPath()); + if (getVolumeCapability() != null) { + JsonObject jsonCap = new JsonObject(); + jsonCap.addProperty("AccessMode", + getVolumeCapability().getAccessMode().name()); + jsonCap.addProperty("VolumeType", + getVolumeCapability().getVolumeType().name()); + jsonObject.addProperty("VolumeCapability", + jsonCap.toString()); + } + return jsonObject.toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeResponse.java new file mode 100644 index 00000000000..b2496edf2d8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeResponse.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.yarn.util.Records; + +public abstract class NodePublishVolumeResponse { + + public static NodePublishVolumeResponse newInstance() { + return Records.newRecord(NodePublishVolumeResponse.class); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeRequest.java new file mode 100644 index 00000000000..1cfa47d9064 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeRequest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.yarn.util.Records; + +public abstract class NodeUnpublishVolumeRequest { + + public static NodeUnpublishVolumeRequest newInstance(String volumeId, + String targetPath) { + NodeUnpublishVolumeRequest request = + Records.newRecord(NodeUnpublishVolumeRequest.class); + request.setVolumeId(volumeId); + request.setTargetPath(targetPath); + return request; + } + + public abstract void setVolumeId(String volumeId); + + public abstract void setTargetPath(String targetPath); + + public abstract String getVolumeId(); + + public abstract String getTargetPath(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeResponse.java new file mode 100644 index 00000000000..b12e0ec3327 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeResponse.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.yarn.util.Records; + +public class NodeUnpublishVolumeResponse { + + public static NodeUnpublishVolumeResponse newInstance() { + return Records.newRecord(NodeUnpublishVolumeResponse.class); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java index 047c09ac8b2..4209ca78ce6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java @@ -276,10 +276,10 @@ public static ResourceInformation newInstance(String name, String units) { } public static ResourceInformation newInstance(String name, String units, - long value, Map attributes) { + long value, Set tags, Map attributes) { return ResourceInformation .newInstance(name, units, value, ResourceTypes.COUNTABLE, 0L, - Long.MAX_VALUE, null, attributes); + Long.MAX_VALUE, tags, attributes); } public static ResourceInformation newInstance(String name, String units, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2f2528445d3..98cb19b7c23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3436,8 +3436,16 @@ public static boolean isAclEnabled(Configuration conf) { */ public static final String NM_CSI_ADAPTOR_PREFIX = NM_PREFIX + "csi-driver-adaptor."; + public static final String NM_CSI_DRIVER_PREFIX = + NM_PREFIX + "csi-driver."; + public static final String NM_CSI_DRIVER_ENDPOINT_SUFFIX = + ".endpoint"; + public static final String NM_CSI_ADAPTOR_ADDRESS_SUFFIX = + ".address"; public static final String NM_CSI_ADAPTOR_ADDRESSES = NM_CSI_ADAPTOR_PREFIX + "addresses"; + public static final String NM_CSI_DRIVER_NAMES = + NM_CSI_DRIVER_PREFIX + "names"; //////////////////////////////// // Other Configs diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/CsiUtils.java similarity index 68% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/CsiUtils.java index 77e69551806..ebc11cabb55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/CsiUtils.java @@ -15,8 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.yarn.csi.utils; +package org.apache.hadoop.yarn.util.csi; +import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -24,13 +25,30 @@ import java.net.InetSocketAddress; /** - * Utility class to load configurations. + * Utility class for CSI in the API level. */ -public final class ConfigUtils { +public final class CsiUtils { - private ConfigUtils() { + private CsiUtils() { // Hide constructor for utility class. } + + public static String[] getCsiDriverNames(Configuration conf) { + return conf.getStrings(YarnConfiguration.NM_CSI_DRIVER_NAMES); + } + + public static String getCsiDriverEndpoint(String driverName, + Configuration conf) throws YarnException { + String driverEndpointProperty = YarnConfiguration.NM_CSI_DRIVER_PREFIX + + driverName + YarnConfiguration.NM_CSI_DRIVER_ENDPOINT_SUFFIX; + String driverEndpoint = conf.get(driverEndpointProperty); + if (Strings.isNullOrEmpty(driverEndpoint)) { + throw new YarnException("CSI driver's endpoint is not specified or" + + " invalid, property "+ driverEndpointProperty + " is not defined"); + } + return driverEndpoint; + } + /** * Resolve the CSI adaptor address for a CSI driver from configuration. * Expected configuration property name is @@ -43,7 +61,7 @@ private ConfigUtils() { public static InetSocketAddress getCsiAdaptorAddressForDriver( String driverName, Configuration conf) throws YarnException { String configName = YarnConfiguration.NM_CSI_ADAPTOR_PREFIX - + driverName + ".address"; + + driverName + YarnConfiguration.NM_CSI_ADAPTOR_ADDRESS_SUFFIX; String errorMessage = "Failed to load CSI adaptor address for driver " + driverName + ", configuration property " + configName + " is not defined or invalid."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/package-info.java new file mode 100644 index 00000000000..059aa6b67e5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.util.csi; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto index 9dcb8a73551..146f5bfba31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto @@ -31,4 +31,10 @@ service CsiAdaptorProtocolService { rpc validateVolumeCapacity (ValidateVolumeCapabilitiesRequest) returns (ValidateVolumeCapabilitiesResponse); + + rpc nodePublishVolume (NodePublishVolumeRequest) + returns (NodePublishVolumeResponse); + + rpc nodeUnpublishVolume (NodeUnpublishVolumeRequest) + returns (NodeUnpublishVolumeResponse); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto index c9adbea7839..9b645e1a9e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto @@ -66,4 +66,28 @@ message GetPluginInfoRequest { message GetPluginInfoResponse { required string name = 1; required string vendor_version = 2; +} + +message NodePublishVolumeRequest { + required string volume_id = 1; + repeated StringStringMapProto publish_context = 2; + optional string staging_target_path = 3; + required string target_path = 4; + required VolumeCapability volume_capability = 5; + required bool readonly = 6; + repeated StringStringMapProto secrets = 7; + repeated StringStringMapProto volume_context = 8; +} + +message NodePublishVolumeResponse { + // Intentionally empty. +} + +message NodeUnpublishVolumeRequest { + required string volume_id = 1; + required string target_path = 2; +} + +message NodeUnpublishVolumeResponse { + // Intentionally empty. } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java index e466ce7ea57..39a9f754dbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java @@ -19,12 +19,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.gson.annotations.SerializedName; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import java.util.Map; import java.util.Objects; +import java.util.Set; /** * ResourceInformation determines unit/name/value of resource types in addition to memory and vcores. It will be part of Resource object @@ -40,11 +42,25 @@ @SerializedName("attributes") private Map attributes = null; + @SerializedName("tags") + private Set tags = null; + public ResourceInformation value(Long value) { this.value = value; return this; } + public ResourceInformation tags(Set tags) { + this.tags = tags; + return this; + } + + @ApiModelProperty(value = "") + @JsonProperty("tags") + public Set getTags() { + return tags == null ? ImmutableSet.of() : tags; + } + @ApiModelProperty(value = "") @JsonProperty("attributes") public Map getAttributes() { @@ -116,6 +132,7 @@ public String toString() { sb.append(" unit: ").append(toIndentedString(unit)).append("\n"); sb.append(" attributes: ").append(toIndentedString(attributes)) .append("\n"); + sb.append(" tags: ").append(toIndentedString(tags)).append("\n"); sb.append("}"); return sb.toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 9895fba82bf..f885b25d4da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -755,6 +755,7 @@ public void requestContainers(long count) { entry.getKey(), specInfo.getUnit(), specInfo.getValue(), + specInfo.getTags(), specInfo.getAttributes()); resource.setResourceInformation(resourceName, ri); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java index 25c502f6494..04c84dc1a2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java @@ -231,5 +231,6 @@ public void testSetResourceAttributes() throws IOException { Assert.assertEquals("yarn.io/csi-volume", volume.getKey()); Assert.assertEquals(100L, volume.getValue().getValue().longValue()); Assert.assertEquals(2, volume.getValue().getAttributes().size()); + Assert.assertEquals(1, volume.getValue().getTags().size()); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json index 74569bd0ba1..ef8e3238cc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json @@ -14,6 +14,7 @@ "yarn.io/csi-volume": { "value": 100, "unit": "Gi", + "tags": ["sample-tag"], "attributes" : { "driver" : "hostpath", "mountPath" : "/mnt/data" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java index 2e10f720468..a43d087a826 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java @@ -25,10 +25,18 @@ import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -82,6 +90,34 @@ public ValidateVolumeCapabilitiesResponse validateVolumeCapacity( } } + @Override + public NodePublishVolumeResponse nodePublishVolume( + NodePublishVolumeRequest request) throws IOException, YarnException { + CsiAdaptorProtos.NodePublishVolumeRequest requestProto = + ((NodePublishVolumeRequestPBImpl) request).getProto(); + try { + return new NodePublishVolumeResponsePBImpl( + proxy.nodePublishVolume(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public NodeUnpublishVolumeResponse nodeUnpublishVolume( + NodeUnpublishVolumeRequest request) throws YarnException, IOException { + CsiAdaptorProtos.NodeUnpublishVolumeRequest requestProto = + ((NodeUnpublishVolumeRequestPBImpl) request).getProto(); + try { + return new NodeUnpublishVolumeResponsePBImpl( + proxy.nodeUnpublishVolume(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + @Override public void close() throws IOException { if(this.proxy != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java index 9a194351e56..624ad3730ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java @@ -23,9 +23,15 @@ import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -72,4 +78,34 @@ public CsiAdaptorProtocolPBServiceImpl(CsiAdaptorProtocol impl) { throw new ServiceException(e); } } + + @Override + public CsiAdaptorProtos.NodePublishVolumeResponse nodePublishVolume( + RpcController controller, + CsiAdaptorProtos.NodePublishVolumeRequest request) + throws ServiceException { + try { + NodePublishVolumeRequestPBImpl req = + new NodePublishVolumeRequestPBImpl(request); + NodePublishVolumeResponse response = real.nodePublishVolume(req); + return ((NodePublishVolumeResponsePBImpl) response).getProto(); + } catch (YarnException | IOException e) { + throw new ServiceException(e); + } + } + + @Override + public CsiAdaptorProtos.NodeUnpublishVolumeResponse nodeUnpublishVolume( + RpcController controller, + CsiAdaptorProtos.NodeUnpublishVolumeRequest request) + throws ServiceException { + try { + NodeUnpublishVolumeRequestPBImpl req = + new NodeUnpublishVolumeRequestPBImpl(request); + NodeUnpublishVolumeResponse response = real.nodeUnpublishVolume(req); + return ((NodeUnpublishVolumeResponsePBImpl) response).getProto(); + } catch (YarnException | IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeRequestPBImpl.java new file mode 100644 index 00000000000..7751748b049 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeRequestPBImpl.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; +import org.apache.hadoop.yarn.proto.YarnProtos; + +import java.util.Map; + +public class NodePublishVolumeRequestPBImpl extends + NodePublishVolumeRequest { + + private CsiAdaptorProtos.NodePublishVolumeRequest.Builder builder; + + public NodePublishVolumeRequestPBImpl() { + this.builder = CsiAdaptorProtos.NodePublishVolumeRequest.newBuilder(); + } + + public NodePublishVolumeRequestPBImpl( + CsiAdaptorProtos.NodePublishVolumeRequest request) { + this.builder = request.toBuilder(); + } + + public CsiAdaptorProtos.NodePublishVolumeRequest getProto() { + Preconditions.checkNotNull(builder); + return builder.build(); + } + + @Override + public void setVolumeId(String volumeId) { + Preconditions.checkNotNull(builder); + builder.setVolumeId(volumeId); + } + + @Override + public String getVolumeId() { + Preconditions.checkNotNull(builder); + return builder.getVolumeId(); + } + + @Override + public void setReadonly(boolean readonly) { + Preconditions.checkNotNull(builder); + builder.setReadonly(readonly); + } + + @Override + public boolean getReadOnly() { + Preconditions.checkNotNull(builder); + return builder.getReadonly(); + } + + @Override + public void setSecrets(Map secrets) { + if (secrets != null) { + Preconditions.checkNotNull(builder); + for(Map.Entry entry : secrets.entrySet()) { + YarnProtos.StringStringMapProto mapEntry = + YarnProtos.StringStringMapProto.newBuilder() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + .build(); + builder.addSecrets(mapEntry); + } + } + } + + @Override + public Map getSecrets() { + Preconditions.checkNotNull(builder); + return builder.getSecretsCount() > 0 ? + ProtoUtils.convertStringStringMapProtoListToMap( + builder.getSecretsList()) : ImmutableMap.of(); + } + + @Override + public String getTargetPath() { + Preconditions.checkNotNull(builder); + return builder.getTargetPath(); + } + + @Override + public void setStagingPath(String stagingPath) { + Preconditions.checkNotNull(builder); + builder.setStagingTargetPath(stagingPath); + } + + @Override + public String getStagingPath() { + Preconditions.checkNotNull(builder); + return builder.getStagingTargetPath(); + } + + @Override + public void setPublishContext(Map publishContext) { + if (publishContext != null) { + Preconditions.checkNotNull(builder); + for(Map.Entry entry : publishContext.entrySet()) { + YarnProtos.StringStringMapProto mapEntry = + YarnProtos.StringStringMapProto.newBuilder() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + .build(); + builder.addPublishContext(mapEntry); + } + } + } + + @Override + public Map getPublishContext() { + Preconditions.checkNotNull(builder); + return builder.getPublishContextCount() > 0 ? + ProtoUtils.convertStringStringMapProtoListToMap( + builder.getPublishContextList()) : ImmutableMap.of(); + } + + @Override + public void setTargetPath(String targetPath) { + if (targetPath != null) { + Preconditions.checkNotNull(builder); + builder.setTargetPath(targetPath); + } + } + + @Override + public void setVolumeCapability( + VolumeCapability capability) { + if (capability != null) { + CsiAdaptorProtos.VolumeCapability vc = + CsiAdaptorProtos.VolumeCapability.newBuilder() + .setAccessMode(CsiAdaptorProtos.VolumeCapability + .AccessMode.valueOf( + capability.getAccessMode().ordinal())) + .setVolumeType(CsiAdaptorProtos.VolumeCapability + .VolumeType.valueOf(capability.getVolumeType().ordinal())) + .addAllMountFlags(capability.getMountFlags()) + .build(); + builder.setVolumeCapability(vc); + } + } + + @Override + public VolumeCapability getVolumeCapability() { + CsiAdaptorProtos.VolumeCapability cap0 = builder.getVolumeCapability(); + if (builder.hasVolumeCapability()) { + return new VolumeCapability( + ValidateVolumeCapabilitiesRequest.AccessMode + .valueOf(cap0.getAccessMode().name()), + ValidateVolumeCapabilitiesRequest.VolumeType + .valueOf(cap0.getVolumeType().name()), + cap0.getMountFlagsList()); + } + return null; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeResponsePBImpl.java new file mode 100644 index 00000000000..34e4f2fee42 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeResponsePBImpl.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; + +public class NodePublishVolumeResponsePBImpl + extends NodePublishVolumeResponse { + + private CsiAdaptorProtos.NodePublishVolumeResponse.Builder builder; + + public NodePublishVolumeResponsePBImpl( + CsiAdaptorProtos.NodePublishVolumeResponse proto) { + this.builder = proto.toBuilder(); + } + + public NodePublishVolumeResponsePBImpl() { + this.builder = CsiAdaptorProtos.NodePublishVolumeResponse + .newBuilder(); + } + + public CsiAdaptorProtos.NodePublishVolumeResponse getProto() { + Preconditions.checkNotNull(builder); + return builder.build(); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeRequestPBImpl.java new file mode 100644 index 00000000000..49ad5b94e7e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeRequestPBImpl.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import com.google.common.base.Preconditions; +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; + +public class NodeUnpublishVolumeRequestPBImpl extends + NodeUnpublishVolumeRequest { + + private CsiAdaptorProtos.NodeUnpublishVolumeRequest.Builder builder; + + public NodeUnpublishVolumeRequestPBImpl() { + this.builder = CsiAdaptorProtos.NodeUnpublishVolumeRequest.newBuilder(); + } + + public NodeUnpublishVolumeRequestPBImpl( + CsiAdaptorProtos.NodeUnpublishVolumeRequest request) { + this.builder = request.toBuilder(); + } + + public CsiAdaptorProtos.NodeUnpublishVolumeRequest getProto() { + Preconditions.checkNotNull(builder); + return builder.build(); + } + + @Override + public void setVolumeId(String volumeId) { + Preconditions.checkNotNull(builder); + this.builder.setVolumeId(volumeId); + } + + @Override + public void setTargetPath(String targetPath) { + Preconditions.checkNotNull(builder); + this.builder.setTargetPath(targetPath); + } + + @Override + public String getVolumeId() { + return builder.getVolumeId(); + } + + @Override + public String getTargetPath() { + return builder.getTargetPath(); + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeResponsePBImpl.java new file mode 100644 index 00000000000..25b8ab89b35 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeResponsePBImpl.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; + +public class NodeUnpublishVolumeResponsePBImpl extends + NodeUnpublishVolumeResponse { + + private CsiAdaptorProtos.NodeUnpublishVolumeResponse.Builder builder; + + public NodeUnpublishVolumeResponsePBImpl() { + this.builder = CsiAdaptorProtos.NodeUnpublishVolumeResponse.newBuilder(); + } + + public NodeUnpublishVolumeResponsePBImpl( + CsiAdaptorProtos.NodeUnpublishVolumeResponse response) { + this.builder = response.toBuilder(); + } + + public CsiAdaptorProtos.NodeUnpublishVolumeResponse getProto() { + Preconditions.checkNotNull(builder); + return builder.build(); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e7a0e1406ce..8c5c5314de7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -4050,4 +4050,24 @@ yarn.nodemanager.csi-driver-adaptor.addresses + + + + CSI driver names running on this node, multiple driver names need to + be delimited by comma. The driver name should be same value returned + by the getPluginInfo call. For each of the CSI driver name, it must + to define following two corresponding properties: + "yarn.nodemanager.csi-driver.${NAME}.endpoint" + "yarn.nodemanager.csi-driver-adaptor.${NAME}.address" + The 1st property defines the driver's endpoint, it must be a valid + unix domain socket path, the driver listens on this endpoint and + serves grpc requests from the driver adaptor; + 2nd property definesCSI driver adaptor's service address for the + specific CSI driver. This adaptor sits side-by-side with this driver + instance, it listens on a Hadoop RPC endpoint so that RM/NM could talk + with it. + + yarn.nodemanager.csi-driver.names + ozones3 + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java index d31c0c9b75f..837b667a5e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java @@ -40,4 +40,10 @@ Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities( Csi.ValidateVolumeCapabilitiesRequest request) throws IOException; + + Csi.NodePublishVolumeResponse nodePublishVolume( + Csi.NodePublishVolumeRequest request) throws IOException; + + Csi.NodeUnpublishVolumeResponse nodeUnpublishVolume( + Csi.NodeUnpublishVolumeRequest request) throws IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java index 5b3d2e23c31..0a107e16b5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java @@ -59,4 +59,24 @@ public GetPluginInfoResponse getPluginInfo() throws IOException { .validateVolumeCapabilities(request); } } + + @Override + public Csi.NodePublishVolumeResponse nodePublishVolume( + Csi.NodePublishVolumeRequest request) throws IOException { + try (CsiGrpcClient client = CsiGrpcClient.newBuilder() + .setDomainSocketAddress(address).build()) { + return client.createNodeBlockingStub() + .nodePublishVolume(request); + } + } + + @Override + public Csi.NodeUnpublishVolumeResponse nodeUnpublishVolume( + Csi.NodeUnpublishVolumeRequest request) throws IOException { + try (CsiGrpcClient client = CsiGrpcClient.newBuilder() + .setDomainSocketAddress(address).build()) { + return client.createNodeBlockingStub() + .nodeUnpublishVolume(request); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java index c4f042ebc18..bcf634addcb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java @@ -25,7 +25,7 @@ * Protobuf message translator for GetPluginInfoResponse and * Csi.GetPluginInfoResponse. */ -public class GetPluginInfoResponseProtoTranslator implements +public class GetPluginInfoResponseProtoTranslator implements ProtoTranslator { @Override public Csi.GetPluginInfoResponse convertTo( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodePublishVolumeRequestProtoTranslator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodePublishVolumeRequestProtoTranslator.java new file mode 100644 index 00000000000..fc8465652aa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodePublishVolumeRequestProtoTranslator.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.translator; + +import csi.v0.Csi; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class NodePublishVolumeRequestProtoTranslator implements + ProtoTranslator { + + @Override + public Csi.NodePublishVolumeRequest convertTo( + NodePublishVolumeRequest messageA) throws YarnException { + Csi.NodePublishVolumeRequest.Builder builder = + Csi.NodePublishVolumeRequest.newBuilder(); + ValidateVolumeCapabilitiesRequest.VolumeCapability cap = + messageA.getVolumeCapability(); + Csi.VolumeCapability cap1 = Csi.VolumeCapability.newBuilder() + .setAccessMode(Csi.VolumeCapability.AccessMode.newBuilder() + .setModeValue(cap.getAccessMode().ordinal())) // access mode + // TODO support block + .setMount(Csi.VolumeCapability.MountVolume.newBuilder() + // TODO support fsType + .setFsType("xfs") // fs type + .addAllMountFlags(cap.getMountFlags())) // mount flags + .build(); + builder.setVolumeCapability(cap1); + builder.setVolumeId(messageA.getVolumeId()); + builder.setTargetPath(messageA.getTargetPath()); + builder.setReadonly(messageA.getReadOnly()); + builder.putAllNodePublishSecrets(messageA.getSecrets()); + builder.putAllPublishInfo(messageA.getPublishContext()); + builder.setStagingTargetPath(messageA.getStagingPath()); + return builder.build(); + } + + @Override + public NodePublishVolumeRequest convertFrom( + Csi.NodePublishVolumeRequest messageB) throws YarnException { + Csi.VolumeCapability cap0 = messageB.getVolumeCapability(); + ValidateVolumeCapabilitiesRequest.VolumeCapability cap = + new ValidateVolumeCapabilitiesRequest.VolumeCapability( + ValidateVolumeCapabilitiesRequest.AccessMode + .valueOf(cap0.getAccessMode().getMode().name()), + ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM, + cap0.getMount().getMountFlagsList()); + return NodePublishVolumeRequest.newInstance( + messageB.getVolumeId(), messageB.getReadonly(), + messageB.getTargetPath(), messageB.getStagingTargetPath(), + cap, messageB.getPublishInfoMap(), + messageB.getNodePublishSecretsMap()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodeUnpublishVolumeRequestProtoTranslator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodeUnpublishVolumeRequestProtoTranslator.java new file mode 100644 index 00000000000..a560a5f5bf8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodeUnpublishVolumeRequestProtoTranslator.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.translator; + +import csi.v0.Csi; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class NodeUnpublishVolumeRequestProtoTranslator implements + ProtoTranslator { + + @Override + public Csi.NodeUnpublishVolumeRequest convertTo( + NodeUnpublishVolumeRequest messageA) throws YarnException { + return Csi.NodeUnpublishVolumeRequest.newBuilder() + .setVolumeId(messageA.getVolumeId()) + .setTargetPath(messageA.getTargetPath()) + .build(); + } + + @Override + public NodeUnpublishVolumeRequest convertFrom( + Csi.NodeUnpublishVolumeRequest messageB) throws YarnException { + return NodeUnpublishVolumeRequest + .newInstance(messageB.getVolumeId(), messageB.getTargetPath()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java index 5eb76ffab5f..1a7306f0bf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java @@ -18,8 +18,11 @@ package org.apache.hadoop.yarn.csi.translator; import csi.v0.Csi; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse; /** * Factory class to get desired proto transformer instance. @@ -57,6 +60,15 @@ private ProtoTranslatorFactory() { } else if (yarnProto == ValidateVolumeCapabilitiesResponse.class && csiProto == Csi.ValidateVolumeCapabilitiesResponse.class) { return new ValidationVolumeCapabilitiesResponseProtoTranslator(); + } else if (yarnProto == NodePublishVolumeRequest.class + && csiProto == Csi.NodePublishVolumeRequest.class) { + return new NodePublishVolumeRequestProtoTranslator(); + } else if (yarnProto == GetPluginInfoResponse.class + && csiProto == Csi.GetPluginInfoResponse.class) { + return new GetPluginInfoResponseProtoTranslator(); + } else if (yarnProto == NodeUnpublishVolumeRequest.class + && csiProto == Csi.NodeUnpublishVolumeRequest.class) { + return new NodeUnpublishVolumeRequestProtoTranslator(); } throw new IllegalArgumentException("A problem is found while processing" + " proto message translating. Unexpected message types," diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java index 128240d86d3..4e6e613ca4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java @@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl; import org.apache.hadoop.yarn.client.NMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.csi.client.CsiClient; +import org.apache.hadoop.yarn.csi.client.ICsiClientTest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.junit.AfterClass; @@ -87,7 +87,7 @@ public void testValidateVolume() throws IOException, YarnException { // inject a fake CSI client // this client validates if the ValidateVolumeCapabilitiesRequest // is integrity, and then reply a fake response - service.setCsiClient(new CsiClient() { + service.setCsiClient(new ICsiClientTest() { @Override public Csi.GetPluginInfoResponse getPluginInfo() { return Csi.GetPluginInfoResponse.newBuilder() @@ -163,7 +163,7 @@ public void testValidateVolumeWithNMProxy() throws Exception { // inject a fake CSI client // this client validates if the ValidateVolumeCapabilitiesRequest // is integrity, and then reply a fake response - service.setCsiClient(new CsiClient() { + service.setCsiClient(new ICsiClientTest() { @Override public Csi.GetPluginInfoResponse getPluginInfo() { return Csi.GetPluginInfoResponse.newBuilder() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/ICsiClientTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/ICsiClientTest.java new file mode 100644 index 00000000000..48b97c11e28 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/ICsiClientTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.client; + +import csi.v0.Csi; + +import java.io.IOException; + +public interface ICsiClientTest extends CsiClient { + + @Override + default Csi.GetPluginInfoResponse getPluginInfo() + throws IOException { + return null; + } + + @Override + default Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities( + Csi.ValidateVolumeCapabilitiesRequest request) throws IOException { + return null; + } + + @Override + default Csi.NodePublishVolumeResponse nodePublishVolume( + Csi.NodePublishVolumeRequest request) throws IOException { + return null; + } + + @Override + default Csi.NodeUnpublishVolumeResponse nodeUnpublishVolume( + Csi.NodeUnpublishVolumeRequest request) throws IOException { + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java index 225bc199250..2790ef2bc97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java @@ -21,10 +21,20 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; +import org.apache.hadoop.yarn.api.impl.pb.client.CsiAdaptorProtocolPBClientImpl; +import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommand; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor; @@ -35,7 +45,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; +import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData; +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; import org.apache.hadoop.yarn.util.DockerClientConfigHandler; +import org.apache.hadoop.yarn.util.csi.CsiUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -69,6 +82,7 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.file.Files; @@ -76,6 +90,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -262,6 +277,7 @@ private Configuration conf; private Context nmContext; private DockerClient dockerClient; + private Map csiClients = new HashMap<>(); private PrivilegedOperationExecutor privilegedOperationExecutor; private String defaultImageName; private Set allowedNetworks = new HashSet<>(); @@ -363,6 +379,9 @@ public void initialize(Configuration conf, Context nmContext) throw new ContainerExecutionException(message); } + // initialize csi adaptors if necessary + initiateCsiClients(conf); + privilegedContainersAcl = new AccessControlList(conf.getTrimmed( YarnConfiguration.NM_DOCKER_PRIVILEGED_CONTAINERS_ACL, YarnConfiguration.DEFAULT_NM_DOCKER_PRIVILEGED_CONTAINERS_ACL)); @@ -964,6 +983,18 @@ public void launchContainer(ContainerRuntimeContext ctx) } } + try { + prepareCsiVolumes(container, container.getWorkDir(), runCommand); + } catch (InvalidVolumeException e) { + throw new ContainerExecutionException( + "Container requests for volume resource but seems there are" + + " some invalid volumes specified"); + } catch (YarnException | IOException e) { + throw new ContainerExecutionException( + "Container requests for volume resource but we are failed" + + " to publish volumes on this node"); + } + if (allowHostPidNamespace(container)) { runCommand.setPidNamespace("host"); } @@ -1121,8 +1152,12 @@ public void signalContainer(ContainerRuntimeContext ctx) executeLivelinessCheck(ctx); } else if (ContainerExecutor.Signal.TERM.equals(signal)) { ContainerId containerId = ctx.getContainer().getContainerId(); + // un-publish volume if necessary + cleanupCsiVolumes(ctx); handleContainerStop(containerId, env); } else { + // un-publish volume if necessary + cleanupCsiVolumes(ctx); handleContainerKill(ctx, env, signal); } } catch (ContainerExecutionException e) { @@ -1517,4 +1552,165 @@ private void addDockerClientConfigToRunCommand(ContainerRuntimeContext ctx, } } + private File getLocalVolumeMountPath( + String containerWorkDir, String volumeId) { + File csiRoot = new File(containerWorkDir, "csi"); + return new File(csiRoot, volumeId + "_mount"); + } + + private File getLocalVolumeStagingPath( + String containerWorkDir, String volumeId) { + File csiRoot = new File(containerWorkDir, "csi"); + return new File(csiRoot, volumeId + "_staging"); + } + + /** + * Prepare CSI volumes if the container resource contains volume resource, + * it first to discover the volume info from container resource; + * then negotiates with CSI driver adaptor to publish the volume on this + * node manager, on a specific directory under container's work dir; + * and then map the local mounted directory to volume target mount in + * the docker container. + * + * CSI volume publish is a two phase work, by reaching up here + * we can assume the 1st phase is done on the RM side, which means + * YARN is already called the controller service of csi-driver + * to publish the volume; here we only need to call the node service of + * csi-driver to publish the volume on this local node manager. + */ + private void prepareCsiVolumes( + Container container, String localMountRoot, DockerRunCommand runCommand) + throws YarnException, IOException { + // Add system mounts for volumes + Resource containerResource = container.getResource(); + for (ResourceInformation resourceInformation : + containerResource.getAllResourcesListCopy()) { + if (resourceInformation.getTags().contains("system:csi-volume")) { + LOG.info("Found volume resource in resource list"); + List metaList = + VolumeMetaData.fromResource(resourceInformation); + for (VolumeMetaData meta : metaList) { + // compose a local mount for CSI volume with the container ID + File localMount = getLocalVolumeMountPath(localMountRoot, + meta.getVolumeId().toString()); + File localStaging = getLocalVolumeStagingPath(localMountRoot, + meta.getVolumeId().toString()); + NodePublishVolumeRequest publishRequest = NodePublishVolumeRequest + .newInstance(meta.getVolumeId().getId(), // volume Id + false, // read only flag + localMount.getAbsolutePath(), // target path + localStaging.getAbsolutePath(), // staging path + new ValidateVolumeCapabilitiesRequest.VolumeCapability( + ValidateVolumeCapabilitiesRequest + .AccessMode.SINGLE_NODE_WRITER, + ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM, + ImmutableList.of()), // capability + ImmutableMap.of(), // publish context + ImmutableMap.of()); // secrets + + if (csiClients.get(meta.getDriverName()) == null) { + throw new YarnException("No csi-adaptor is found that can talk" + + " to csi-driver " + meta.getDriverName()); + } + // publish volume to node + LOG.info("Publish volume on NM, request {}", + publishRequest.toString()); + csiClients.get(meta.getDriverName()) + .nodePublishVolume(publishRequest); + // once succeed, bind the container to this mount + String containerMountPath = meta.getMountPoint(); + LOG.info("Adding bind-mount: src=" + localMount.getAbsolutePath() + + ", dest=" + containerMountPath +", mode=rw"); + runCommand.addMountLocation(localMount.getAbsolutePath(), + containerMountPath, "rw"); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("No volume resource is found with system:csi-volume tag"); + } + } + } + } + + private void cleanupCsiVolumes(ContainerRuntimeContext ctx) + throws ContainerExecutionException { + // Add system mounts for volumes + Container container = ctx.getContainer(); + LOG.info("Executing un-publish volume for container " + + container.getContainerId().toString() + + ", container resource: " + container.getResource().toString()); + Resource containerResource = container.getResource(); + for (ResourceInformation resourceInformation : + containerResource.getAllResourcesListCopy()) { + if (resourceInformation.getTags().contains("system:csi-volume")) { + LOG.info("Found volume resource in resource list"); + List metaList; + try { + metaList = VolumeMetaData.fromResource(resourceInformation); + } catch (InvalidVolumeException e) { + throw new ContainerExecutionException( + "Invalid volume found in the request"); + } + for (VolumeMetaData meta : metaList) { + // When container is launched, the container work dir is memorized, + // and that is also the dir we mount the volume to. + File localMount = getLocalVolumeMountPath(container.getWorkDir(), + meta.getVolumeId().toString()); + if (!localMount.exists()) { + LOG.info("Local mount {} no longer exist, skipping cleaning" + + " up the volume", localMount.getAbsolutePath()); + continue; + } + NodeUnpublishVolumeRequest unpublishRequest = + NodeUnpublishVolumeRequest.newInstance( + meta.getVolumeId().getId(), // volume id + localMount.getAbsolutePath()); // target path + if (csiClients.get(meta.getDriverName()) == null) { + throw new ContainerExecutionException( + "No csi-adaptor is found that can talk" + + " to csi-driver " + meta.getDriverName()); + } + // un-publish volume from node + LOG.info("Un-publish volume on NM, request {}", + unpublishRequest.toString()); + try { + csiClients.get(meta.getDriverName()) + .nodeUnpublishVolume(unpublishRequest); + } catch (YarnException | IOException e) { + throw new ContainerExecutionException("Un-publish volume failed"); + } + } + } + } + } + + /** + * Initiate CSI clients to talk to the CSI adaptors on this node and + * cache the clients for easier fetch. + * @param config configuration + * @throws ContainerExecutionException + */ + private void initiateCsiClients(Configuration config) + throws ContainerExecutionException { + String[] driverNames = CsiUtils.getCsiDriverNames(config); + if (driverNames != null || driverNames.length > 0) { + for (String driverName : driverNames) { + try { + // find out the adaptors service address + InetSocketAddress adaptorServiceAddress = + CsiUtils.getCsiAdaptorAddressForDriver(driverName, config); + LOG.info("Initializing a csi-adaptor-client for csi-adaptor {}," + + " csi-driver {}", adaptorServiceAddress.toString(), driverName); + CsiAdaptorProtocolPBClientImpl client = + new CsiAdaptorProtocolPBClientImpl(1L, adaptorServiceAddress, + config); + csiClients.put(driverName, client); + } catch (IOException e1) { + throw new ContainerExecutionException(e1.getMessage()); + } catch (YarnException e2) { + throw new ContainerExecutionException(e2.getMessage()); + } + } + } + } }