diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 937f21f030f..0a747d43baa 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -464,6 +464,12 @@ test-jar + + org.apache.hadoop + hadoop-yarn-csi + ${hadoop.version} + + org.apache.hadoop hadoop-mapreduce-client-jobclient diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml index 41f50985960..0838ae0cedf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml @@ -77,6 +77,11 @@ junit junit + + org.mockito + mockito-all + test + org.apache.hadoop hadoop-common @@ -84,9 +89,12 @@ test - javax.annotation - javax.annotation-api - compile + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-yarn-common diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptor.java new file mode 100644 index 00000000000..270cb4c313b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptor.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.adaptor; + +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.io.IOException; + +/** + * CSI adaptor delegates all the calls from YARN to a CSI driver. + */ +public interface CsiAdaptor { + + ValidateVolumeCapabilitiesResponse validateVolumeCapacity( + ValidateVolumeCapabilitiesRequest request) throws YarnException, + IOException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorPB.java new file mode 100644 index 00000000000..d9f5cc199f9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorPB.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.adaptor; + +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.CsiAdaptor; + +@ProtocolInfo( + protocolName = "org.apache.hadoop.yarn.csi.adaptor.CsiAdaptorPB", + protocolVersion = 1) +public interface CsiAdaptorPB + extends CsiAdaptor.CsiAdaptorService.BlockingInterface { + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorService.java new file mode 100644 index 00000000000..ece3f312999 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorService.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.adaptor; + +import com.google.common.annotations.VisibleForTesting; +import csi.v0.Csi; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesResponse; +import org.apache.hadoop.yarn.csi.client.CsiClient; +import org.apache.hadoop.yarn.csi.client.CsiClientImpl; +import org.apache.hadoop.yarn.csi.utils.ProtoTransformerFactory; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * This is a Hadoop RPC server, we uses the Hadoop RPC framework here + * because we need to stick to the security model current Hadoop supports. + */ +public class CsiAdaptorService extends AbstractService + implements CsiAdaptor { + + private static final Logger LOG = + LoggerFactory.getLogger(CsiAdaptorService.class); + + private Server server; + private InetSocketAddress adaptorServiceAddress; + private CsiClient csiClient; + + public CsiAdaptorService(String domainSocketPath) { + super(CsiAdaptorService.class.getName()); + this.csiClient = new CsiClientImpl(domainSocketPath); + } + + @VisibleForTesting + public void setCsiClient(CsiClient client) { + this.csiClient = client; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + adaptorServiceAddress = conf.getSocketAddr( + "yarn.nodemanager.csi.adaptor.address", + "0.0.0.0:9888", + 9888); + } + + @Override + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); + this.server = rpc.getServer( + CsiAdaptor.class, + this, adaptorServiceAddress, conf, null, 1); + this.server.start(); + LOG.info("{} started, listening on address: {}", + CsiAdaptorService.class.getName(), + adaptorServiceAddress.toString()); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.server != null) { + this.server.stop(); + } + super.serviceStop(); + } + + @Override + public ValidateVolumeCapabilitiesResponse validateVolumeCapacity( + ValidateVolumeCapabilitiesRequest request) throws YarnException, + IOException { + Csi.ValidateVolumeCapabilitiesRequest req = ProtoTransformerFactory + .getTransformer(ValidateVolumeCapabilitiesRequest.class, + Csi.ValidateVolumeCapabilitiesRequest.class) + .convertTo(request); + Csi.ValidateVolumeCapabilitiesResponse response = + csiClient.validateVolumeCapabilities(req); + return ProtoTransformerFactory.getTransformer( + ValidateVolumeCapabilitiesResponse.class, + Csi.ValidateVolumeCapabilitiesResponse.class) + .convertFrom(response); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/impl/pb/client/CsiAdaptorPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/impl/pb/client/CsiAdaptorPBClientImpl.java new file mode 100644 index 00000000000..f9e0a7ce02a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/impl/pb/client/CsiAdaptorPBClientImpl.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.adaptor.impl.pb.client; + +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.csi.adaptor.CsiAdaptor; +import org.apache.hadoop.yarn.csi.adaptor.CsiAdaptorPB; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesResponse; +import org.apache.hadoop.yarn.csi.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl; +import org.apache.hadoop.yarn.csi.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * CSI adaptor client implementation. + */ +public class CsiAdaptorPBClientImpl implements CsiAdaptor, Closeable { + + private final CsiAdaptorPB proxy; + + public CsiAdaptorPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, CsiAdaptorPB.class, ProtobufRpcEngine.class); + this.proxy = RPC.getProxy(CsiAdaptorPB.class, clientVersion, addr, conf); + } + + @Override + public ValidateVolumeCapabilitiesResponse validateVolumeCapacity( + ValidateVolumeCapabilitiesRequest request) + throws YarnException, IOException { + CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest requestProto = + ((ValidateVolumeCapabilitiesRequestPBImpl) request).getProto(); + try { + return new ValidateVolumeCapabilitiesResponsePBImpl( + proxy.validateVolumeCapacity(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public void close() throws IOException { + if(this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/impl/pb/client/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/impl/pb/client/package-info.java new file mode 100644 index 00000000000..3e0572767aa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/impl/pb/client/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains CSI adaptor client class. + */ +package org.apache.hadoop.yarn.csi.adaptor.impl.pb.client; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/impl/pb/service/CsiAdaptorPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/impl/pb/service/CsiAdaptorPBServiceImpl.java new file mode 100644 index 00000000000..f0804c1f833 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/impl/pb/service/CsiAdaptorPBServiceImpl.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.adaptor.impl.pb.service; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.yarn.csi.adaptor.CsiAdaptor; +import org.apache.hadoop.yarn.csi.adaptor.CsiAdaptorPB; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.csi.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesResponse; +import org.apache.hadoop.yarn.csi.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.io.IOException; + +/** + * CSI adaptor server side implementation, this is hosted on a node manager. + */ +public class CsiAdaptorPBServiceImpl implements CsiAdaptorPB { + + private final CsiAdaptor real; + public CsiAdaptorPBServiceImpl(CsiAdaptor impl) { + this.real = impl; + } + + @Override + public CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse + validateVolumeCapacity(RpcController controller, + ValidateVolumeCapabilitiesRequest request) throws ServiceException { + try { + org.apache.hadoop.yarn.csi.api.protocolrecords + .ValidateVolumeCapabilitiesRequest req = + new ValidateVolumeCapabilitiesRequestPBImpl(request); + ValidateVolumeCapabilitiesResponse response = + real.validateVolumeCapacity(req); + return ((ValidateVolumeCapabilitiesResponsePBImpl) response).getProto(); + } catch (YarnException | IOException e) { + throw new ServiceException(e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/impl/pb/service/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/impl/pb/service/package-info.java new file mode 100644 index 00000000000..87c86bc534d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/impl/pb/service/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains CSI adaptor server class. + */ +package org.apache.hadoop.yarn.csi.adaptor.impl.pb.service; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java new file mode 100644 index 00000000000..919aab587aa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains CSI adaptor classes. + */ +package org.apache.hadoop.yarn.csi.adaptor; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/ValidateVolumeCapabilitiesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/ValidateVolumeCapabilitiesRequest.java new file mode 100644 index 00000000000..96447e771d2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/ValidateVolumeCapabilitiesRequest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.api.protocolrecords; + +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; +import java.util.Map; + +/** + * YARN internal message used to validate volume capabilities + * with a CSI driver controller plugin. + */ +public abstract class ValidateVolumeCapabilitiesRequest { + + public enum AccessMode { + UNKNOWN, + SINGLE_NODE_WRITER, + SINGLE_NODE_READER_ONLY, + MULTI_NODE_READER_ONLY, + MULTI_NODE_SINGLE_WRITER, + MULTI_NODE_MULTI_WRITER, + } + + public enum VolumeType { + BLOCK, + FILE_SYSTEM + } + + public static class VolumeCapability { + + private AccessMode mode; + private VolumeType type; + private List flags; + + public VolumeCapability(AccessMode accessMode, VolumeType volumeType, + List mountFlags) { + this.mode = accessMode; + this.type = volumeType; + this.flags = mountFlags; + } + + public AccessMode getAccessMode() { + return mode; + } + + public VolumeType getVolumeType() { + return type; + } + + public List getMountFlags() { + return flags; + } + } + + public static ValidateVolumeCapabilitiesRequest newInstance( + String volumeId, VolumeCapability volumeCapability, + Map volumeAttributes) { + ValidateVolumeCapabilitiesRequest request = + Records.newRecord(ValidateVolumeCapabilitiesRequest.class); + request.setVolumeId(volumeId); + request.setVolumeAttributes(volumeAttributes); + request.addVolumeCapability(volumeCapability); + return request; + } + + public static ValidateVolumeCapabilitiesRequest newInstance( + String volumeId, Map volumeAttributes) { + ValidateVolumeCapabilitiesRequest request = + Records.newRecord(ValidateVolumeCapabilitiesRequest.class); + request.setVolumeId(volumeId); + request.setVolumeAttributes(volumeAttributes); + return request; + } + + public abstract void setVolumeId(String volumeId); + + public abstract String getVolumeId(); + + public abstract void setVolumeAttributes(Map attributes); + + public abstract Map getVolumeAttributes(); + + public abstract void addVolumeCapability(VolumeCapability volumeCapability); + + public abstract List getVolumeCapabilities(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/ValidateVolumeCapabilitiesResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/ValidateVolumeCapabilitiesResponse.java new file mode 100644 index 00000000000..ba8cdfdea7c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/ValidateVolumeCapabilitiesResponse.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.api.protocolrecords; + +import org.apache.hadoop.yarn.util.Records; + +/** + * YARN internal message used to represent the response of + * volume capabilities validation with a CSI driver controller plugin. + */ +public abstract class ValidateVolumeCapabilitiesResponse { + + public static ValidateVolumeCapabilitiesResponse newInstance( + boolean supported, String message) { + ValidateVolumeCapabilitiesResponse record = + Records.newRecord(ValidateVolumeCapabilitiesResponse.class); + record.setMessage(message); + record.setSupported(supported); + return record; + } + + public abstract void setSupported(boolean supported); + + public abstract boolean isSupported(); + + public abstract void setMessage(String message); + + public abstract String getMessage(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesRequestPBImpl.java new file mode 100644 index 00000000000..d61936429ba --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesRequestPBImpl.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.api.protocolrecords.impl.pb; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesRequest; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * PB wrapper for {@link CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest}. + */ +public class ValidateVolumeCapabilitiesRequestPBImpl extends + ValidateVolumeCapabilitiesRequest { + + private CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest.Builder builder; + + public ValidateVolumeCapabilitiesRequestPBImpl( + CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest proto) { + this.builder = proto.toBuilder(); + } + + public ValidateVolumeCapabilitiesRequestPBImpl() { + this.builder = CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest + .newBuilder(); + } + + @Override + public String getVolumeId() { + Preconditions.checkNotNull(builder); + return builder.getVolumeId(); + } + + @Override + public void setVolumeAttributes(Map attributes) { + Preconditions.checkNotNull(builder); + builder.putAllVolumeAttributes(attributes); + } + + @Override + public void setVolumeId(String volumeId) { + Preconditions.checkNotNull(builder); + builder.setVolumeId(volumeId); + } + + @Override + public void addVolumeCapability(VolumeCapability volumeCapability) { + Preconditions.checkNotNull(builder); + CsiAdaptorProtos.VolumeCapability vc = + CsiAdaptorProtos.VolumeCapability.newBuilder() + .setAccessMode(CsiAdaptorProtos.VolumeCapability.AccessMode + .forNumber(volumeCapability.getAccessMode().ordinal())) + .setVolumeType(CsiAdaptorProtos.VolumeCapability.VolumeType + .forNumber(volumeCapability.getVolumeType().ordinal())) + .addAllMountFlags(volumeCapability.getMountFlags()) + .build(); + builder.addVolumeCapabilities(vc); + } + + @Override + public List getVolumeCapabilities() { + Preconditions.checkNotNull(builder); + List caps = new ArrayList<>( + builder.getVolumeCapabilitiesCount()); + builder.getVolumeCapabilitiesList().forEach(capability -> { + VolumeCapability vc = new VolumeCapability( + AccessMode.valueOf(capability.getAccessMode().name()), + VolumeType.valueOf(capability.getVolumeType().name()), + capability.getMountFlagsList()); + caps.add(vc); + }); + return caps; + } + + @Override + public Map getVolumeAttributes() { + Preconditions.checkNotNull(builder); + return builder.getVolumeAttributesMap(); + } + + public CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest getProto() { + Preconditions.checkNotNull(builder); + return builder.build(); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesResponsePBImpl.java new file mode 100644 index 00000000000..863ded58963 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesResponsePBImpl.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.api.protocolrecords.impl.pb; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesResponse; + +/** + * PB wrapper for {@link CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse}. + */ +public class ValidateVolumeCapabilitiesResponsePBImpl + extends ValidateVolumeCapabilitiesResponse { + + private CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse.Builder builder; + + public ValidateVolumeCapabilitiesResponsePBImpl() { + this.builder = CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse + .newBuilder(); + } + + public ValidateVolumeCapabilitiesResponsePBImpl( + CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse response) { + this.builder = response.toBuilder(); + } + + @Override + public void setSupported(boolean supported) { + Preconditions.checkNotNull(builder); + this.builder.setSupported(supported); + } + + @Override + public boolean isSupported() { + Preconditions.checkNotNull(builder); + return builder.getSupported(); + } + + @Override + public void setMessage(String message) { + Preconditions.checkNotNull(builder); + this.builder.setMessage(message); + } + + @Override + public String getMessage() { + Preconditions.checkNotNull(builder); + return this.builder.getMessage(); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + public CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse getProto() { + Preconditions.checkNotNull(builder); + return builder.build(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/impl/pb/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/impl/pb/package-info.java new file mode 100644 index 00000000000..4653e576afa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/impl/pb/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains CSI basic pb records. + */ +package org.apache.hadoop.yarn.csi.api.protocolrecords.impl.pb; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/package-info.java new file mode 100644 index 00000000000..3800f6f46a6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/api/protocolrecords/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains CSI basic pb records. + */ +package org.apache.hadoop.yarn.csi.api.protocolrecords; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java index 5bb9ce94412..014d7a3d760 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.csi.client; import csi.v0.Csi.GetPluginInfoResponse; +import csi.v0.Csi.ValidateVolumeCapabilitiesRequest; +import csi.v0.Csi.ValidateVolumeCapabilitiesResponse; import java.io.IOException; @@ -36,4 +38,7 @@ * @throws IOException when unable to get plugin info from the driver. */ GetPluginInfoResponse getPluginInfo() throws IOException; + + ValidateVolumeCapabilitiesResponse validateVolumeCapabilities( + ValidateVolumeCapabilitiesRequest request) throws IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java index 58dd292d943..5b3d2e23c31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.csi.client; +import csi.v0.Csi; import csi.v0.Csi.GetPluginInfoRequest; import csi.v0.Csi.GetPluginInfoResponse; import org.apache.hadoop.yarn.csi.utils.GrpcHelper; @@ -48,4 +49,14 @@ public GetPluginInfoResponse getPluginInfo() throws IOException { return client.createIdentityBlockingStub().getPluginInfo(request); } } + + @Override + public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities( + Csi.ValidateVolumeCapabilitiesRequest request) throws IOException { + try (CsiGrpcClient client = CsiGrpcClient.newBuilder() + .setDomainSocketAddress(address).build()) { + return client.createControllerBlockingStub() + .validateVolumeCapabilities(request); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ProtoTransformer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ProtoTransformer.java new file mode 100644 index 00000000000..ead50baa9be --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ProtoTransformer.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.utils; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * ProtoTransformer converts a YARN side message to CSI proto message + * and vice versa. Each CSI proto message should have a corresponding + * YARN side message implementation, and a transformer to convert them + * one to the other. This layer helps we to hide CSI spec messages + * from YARN components. + * + * @param YARN side internal messages + * @param CSI proto messages + */ +public interface ProtoTransformer { + + /** + * Convert message from type {@link A} to type {@link B}. + * @param messageA + * @return messageB + * @throws YarnException + */ + B convertTo(A messageA) throws YarnException; + + /** + * Convert message from type {@link B} to type {@link A}. + * @param messageB + * @return messageA + * @throws YarnException + */ + A convertFrom(B messageB) throws YarnException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ProtoTransformerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ProtoTransformerFactory.java new file mode 100644 index 00000000000..90b022ce393 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ProtoTransformerFactory.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.utils; + +import csi.v0.Csi; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesResponse; + +/** + * Factory class to get desired proto transformer instance. + */ +public class ProtoTransformerFactory { + + /** + * Get a {@link ProtoTransformer} based on the given input message + * types. If the type is not supported, + * @param yarnProto yarn proto message + * @param csiProto CSI proto message + * @param yarn proto message + * @param CSI proto message + * @throws IllegalArgumentException + * when given types are not supported + * @return + */ + public static ProtoTransformer getTransformer( + Class yarnProto, Class csiProto) { + + System.out.println(yarnProto.getClass().isAssignableFrom( + ValidateVolumeCapabilitiesRequest.class)); + + if (yarnProto == ValidateVolumeCapabilitiesRequest.class && + csiProto == Csi.ValidateVolumeCapabilitiesRequest.class) { + return new ValidateVolumeCapabilitiesRequestProtoTransformer(); + } else if (yarnProto.getClass().isAssignableFrom( + ValidateVolumeCapabilitiesResponse.class) && + csiProto.getClass().isAssignableFrom( + Csi.ValidateVolumeCapabilitiesResponse.class)) { + return new ValidationVolumeCapabilitiesResponseProtoTransformer(); + } + throw new IllegalArgumentException("No transformer is found for type " + + yarnProto.getName() + " <-> " + csiProto.getName()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ValidateVolumeCapabilitiesRequestProtoTransformer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ValidateVolumeCapabilitiesRequestProtoTransformer.java new file mode 100644 index 00000000000..ce7faace746 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ValidateVolumeCapabilitiesRequestProtoTransformer.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.csi.utils; + +import csi.v0.Csi; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.yarn.csi.api.protocolrecords + .ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM; + +public class ValidateVolumeCapabilitiesRequestProtoTransformer + implements ProtoTransformer { + + @Override + public Csi.ValidateVolumeCapabilitiesRequest convertTo( + ValidateVolumeCapabilitiesRequest request) throws YarnException { + Csi.ValidateVolumeCapabilitiesRequest.Builder buidler = + Csi.ValidateVolumeCapabilitiesRequest.newBuilder(); + buidler.setVolumeId(request.getVolumeId()); + for (ValidateVolumeCapabilitiesRequest.VolumeCapability cap : + request.getVolumeCapabilities()) { + if (cap.getVolumeType() != FILE_SYSTEM) { + throw new YarnException("Invalid ValidateVolumeCapabilitiesRequest," + + " currently only FILE_SYSTEM volume type is supported"); + } + Csi.VolumeCapability.AccessMode accessMode = + Csi.VolumeCapability.AccessMode.newBuilder() + .setModeValue(cap.getAccessMode().ordinal()) + .build(); + Csi.VolumeCapability.MountVolume mountVolume = + Csi.VolumeCapability.MountVolume.newBuilder() + .addAllMountFlags(cap.getMountFlags()) + .build(); + Csi.VolumeCapability capability = + Csi.VolumeCapability.newBuilder() + .setAccessMode(accessMode) + .setMount(mountVolume) + .build(); + buidler.addVolumeCapabilities(capability); + } + return buidler.build(); + } + + @Override + public ValidateVolumeCapabilitiesRequest convertFrom( + Csi.ValidateVolumeCapabilitiesRequest request) throws YarnException { + ValidateVolumeCapabilitiesRequest result = ValidateVolumeCapabilitiesRequest + .newInstance(request.getVolumeId(), request.getVolumeAttributesMap()); + for (csi.v0.Csi.VolumeCapability csiCap : + request.getVolumeCapabilitiesList()) { + ValidateVolumeCapabilitiesRequest.AccessMode mode = + ValidateVolumeCapabilitiesRequest.AccessMode + .valueOf(csiCap.getAccessMode().getMode().name()); + if (!csiCap.hasMount()) { + throw new YarnException("Invalid request," + + " mount is not found in the request."); + } + List mountFlags = new ArrayList<>(); + for (int i=0; i + implements ProtoTransformer { + + @Override + public Csi.ValidateVolumeCapabilitiesResponse convertTo( + ValidateVolumeCapabilitiesResponse response) throws YarnException { + return Csi.ValidateVolumeCapabilitiesResponse.newBuilder() + .setSupported(response.isSupported()) + .setMessage(response.getMessage()) + .build(); + } + + @Override + public ValidateVolumeCapabilitiesResponse convertFrom( + Csi.ValidateVolumeCapabilitiesResponse response) throws YarnException { + return ValidateVolumeCapabilitiesResponse.newInstance( + response.getSupported(), response.getMessage()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/proto/YarnCsiAdaptor.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/proto/YarnCsiAdaptor.proto new file mode 100644 index 00000000000..c951bb94f51 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/proto/YarnCsiAdaptor.proto @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto2"; + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "CsiAdaptor"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_csi_adaptor.proto"; + +service CsiAdaptorService { + + rpc validateVolumeCapacity (ValidateVolumeCapabilitiesRequest) + returns (ValidateVolumeCapabilitiesResponse); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/proto/yarn_csi_adaptor.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/proto/yarn_csi_adaptor.proto new file mode 100644 index 00000000000..e4a49051563 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/proto/yarn_csi_adaptor.proto @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto2"; + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "CsiAdaptorProtos"; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +message ValidateVolumeCapabilitiesRequest { + required string volume_id = 1; + repeated VolumeCapability volume_capabilities = 2; + map volume_attributes = 3; +} + +message ValidateVolumeCapabilitiesResponse { + // True if the Plugin supports the specified capabilities for the + // given volume. This field is REQUIRED. + required bool supported = 1; + + // Message to the CO if `supported` above is false. This field is + // OPTIONAL. + // An empty string is equal to an unspecified field value. + optional string message = 2; +} + +message VolumeCapability { + enum VolumeType { + BLOCK = 0; + FILE_SYSTEM = 1; + } + + enum AccessMode { + UNKNOWN = 0; + SINGLE_NODE_WRITER = 1; + SINGLE_NODE_READER_ONLY = 2; + MULTI_NODE_READER_ONLY = 3; + MULTI_NODE_SINGLE_WRITER = 4; + MULTI_NODE_MULTI_WRITER = 5; + } + + required VolumeType volume_type = 1; + required AccessMode access_mode = 2; + repeated string mount_flags = 3; +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java new file mode 100644 index 00000000000..750a31298e6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.adaptor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.csi.adaptor.impl.pb.client.CsiAdaptorPBClientImpl; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesResponse; +import org.apache.hadoop.yarn.csi.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl; +import org.apache.hadoop.yarn.csi.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; + +import static org.apache.hadoop.yarn.csi.api.protocolrecords + .ValidateVolumeCapabilitiesRequest.AccessMode.MULTI_NODE_MULTI_WRITER; +import static org.apache.hadoop.yarn.csi.api.protocolrecords + .ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; + +/** + * UT for {@link CsiAdaptorService} + */ +public class TestCsiAdaptorService { + + private static File testRoot = null; + private static String domainSocket = null; + + @BeforeClass + public static void setUp() throws IOException { + testRoot = GenericTestUtils.getTestDir("csi-test"); + File socketPath = new File(testRoot, "csi.sock"); + FileUtils.forceMkdirParent(socketPath); + domainSocket = "unix://" + socketPath.getAbsolutePath(); + } + + @AfterClass + public static void tearDown() throws IOException { + if (testRoot != null) { + FileUtils.deleteDirectory(testRoot); + } + } + + @Test + public void testService() throws IOException, YarnException { + InetSocketAddress address = new InetSocketAddress(0); + Configuration conf = new Configuration(); + conf.setSocketAddr("yarn.nodemanager.csi.adaptor.address", address); + CsiAdaptorService service = new CsiAdaptorService(domainSocket); + CsiAdaptorService adaptor = Mockito.spy(service); + doReturn(ValidateVolumeCapabilitiesResponsePBImpl + .newInstance(false, "capability not supported")).when(adaptor) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); + + adaptor.init(new Configuration()); + adaptor.start(); + + try (CsiAdaptorPBClientImpl client = new CsiAdaptorPBClientImpl( + 1L, new InetSocketAddress("0.0.0.0", 9888), new Configuration())) { + ValidateVolumeCapabilitiesRequest request = + ValidateVolumeCapabilitiesRequestPBImpl + .newInstance("volume-id-0000123", + new ValidateVolumeCapabilitiesRequest.VolumeCapability( + MULTI_NODE_MULTI_WRITER, FILE_SYSTEM, + ImmutableList.of("mountFlag1", "mountFlag2")), + ImmutableMap.of("k1", "v1", "k2", "v2")); + + ValidateVolumeCapabilitiesResponse response = client + .validateVolumeCapacity(request); + + Assert.assertEquals(false, response.isSupported()); + Assert.assertEquals("capability not supported", response.getMessage()); + } finally { + adaptor.stop(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java new file mode 100644 index 00000000000..b07b2a7d94d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.adaptor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability; +import org.apache.hadoop.yarn.csi.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.AccessMode; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.VolumeType; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesRequest.AccessMode.*; +import static org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType.*; + +/** + * UT for message exchanges. + */ +public class TestValidateVolumeCapabilityRequest { + + @Test + public void testPBRecord() { + CsiAdaptorProtos.VolumeCapability vcProto = + CsiAdaptorProtos.VolumeCapability.newBuilder() + .setAccessMode(AccessMode.MULTI_NODE_MULTI_WRITER) + .setVolumeType(VolumeType.FILE_SYSTEM) + .addMountFlags("flag0") + .addMountFlags("flag1") + .build(); + + CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest requestProto = + CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest.newBuilder() + .setVolumeId("volume-id-0000001") + .addVolumeCapabilities(vcProto) + .putVolumeAttributes("attr0", "value0") + .putVolumeAttributes("attr1", "value1") + .build(); + + ValidateVolumeCapabilitiesRequestPBImpl request = + new ValidateVolumeCapabilitiesRequestPBImpl(requestProto); + + Assert.assertEquals("volume-id-0000001", request.getVolumeId()); + Assert.assertEquals(2, request.getVolumeAttributes().size()); + Assert.assertEquals("value0", request.getVolumeAttributes().get("attr0")); + Assert.assertEquals("value1", request.getVolumeAttributes().get("attr1")); + Assert.assertEquals(1, request.getVolumeCapabilities().size()); + VolumeCapability vc = + request.getVolumeCapabilities().get(0); + Assert.assertEquals(MULTI_NODE_MULTI_WRITER, vc.getAccessMode()); + Assert.assertEquals(FILE_SYSTEM, vc.getVolumeType()); + Assert.assertEquals(2, vc.getMountFlags().size()); + + Assert.assertEquals(requestProto, request.getProto()); + } + + @Test + public void testNewInstance() { + ValidateVolumeCapabilitiesRequest pbImpl = + ValidateVolumeCapabilitiesRequestPBImpl + .newInstance("volume-id-0000123", + new VolumeCapability(MULTI_NODE_MULTI_WRITER, FILE_SYSTEM, + ImmutableList.of("mountFlag1", "mountFlag2")), + ImmutableMap.of("k1", "v1", "k2", "v2")); + + Assert.assertEquals("volume-id-0000123", pbImpl.getVolumeId()); + Assert.assertEquals(1, pbImpl.getVolumeCapabilities().size()); + Assert.assertEquals(FILE_SYSTEM, + pbImpl.getVolumeCapabilities().get(0).getVolumeType()); + Assert.assertEquals(MULTI_NODE_MULTI_WRITER, + pbImpl.getVolumeCapabilities().get(0).getAccessMode()); + Assert.assertEquals(2, pbImpl.getVolumeAttributes().size()); + + CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest proto = + ((ValidateVolumeCapabilitiesRequestPBImpl) pbImpl).getProto(); + Assert.assertEquals("volume-id-0000123", proto.getVolumeId()); + Assert.assertEquals(1, proto.getVolumeCapabilitiesCount()); + Assert.assertEquals(AccessMode.MULTI_NODE_MULTI_WRITER, + proto.getVolumeCapabilities(0).getAccessMode()); + Assert.assertEquals(VolumeType.FILE_SYSTEM, + proto.getVolumeCapabilities(0).getVolumeType()); + Assert.assertEquals(2, proto.getVolumeCapabilities(0) + .getMountFlagsCount()); + Assert.assertEquals(2, proto.getVolumeCapabilities(0) + .getMountFlagsList().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java new file mode 100644 index 00000000000..9bcc418f199 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.adaptor; + +import org.apache.hadoop.yarn.csi.api.protocolrecords.ValidateVolumeCapabilitiesResponse; +import org.apache.hadoop.yarn.csi.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; +import org.junit.Assert; +import org.junit.Test; + +/** + * UT for message exchanges. + */ +public class TestValidateVolumeCapabilityResponse { + + @Test + public void testPBRecord() { + CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse proto = + CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse.newBuilder() + .setSupported(true) + .setMessage("capability is supported") + .build(); + + ValidateVolumeCapabilitiesResponsePBImpl pbImpl = + new ValidateVolumeCapabilitiesResponsePBImpl(proto); + + Assert.assertEquals(true, pbImpl.isSupported()); + Assert.assertEquals("capability is supported", pbImpl.getMessage()); + Assert.assertEquals(proto, pbImpl.getProto()); + } + + @Test + public void testNewInstance() { + ValidateVolumeCapabilitiesResponse pbImpl = + ValidateVolumeCapabilitiesResponsePBImpl + .newInstance(false, "capability not supported"); + Assert.assertEquals(false, pbImpl.isSupported()); + Assert.assertEquals("capability not supported", pbImpl.getMessage()); + + CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse proto = + ((ValidateVolumeCapabilitiesResponsePBImpl) pbImpl).getProto(); + Assert.assertEquals(false, proto.getSupported()); + Assert.assertEquals("capability not supported", proto.getMessage()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java new file mode 100644 index 00000000000..fed20c7def6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains classes for CSI adaptor testing. + */ +package org.apache.hadoop.yarn.csi.adaptor;