diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml index affbe03..5e2278d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml @@ -96,6 +96,7 @@ server/resourcemanager_administration_protocol.proto application_history_client.proto server/application_history_server.proto + client_SCM_protocol.proto ${project.build.directory}/generated-sources/java diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java new file mode 100644 index 0000000..74efbe9 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + *

+ * The protocol between clients and the SharedCacheManager to claim + * and release resources in the shared cache. + *

+ */ +@Public +@Stable +public interface ClientSCMProtocol { + /** + *

+ * The interface used by clients to claim a resource with the + * SharedCacheManager. The client uses a checksum to identify the + * resource and an {@link ApplicationId} to identify which application will be + * using the resource. + *

+ * + *

+ * The SharedCacheManager responds with whether or not the + * resource exists in the cache. If the resource exists, a Path + * to the resource in the shared cache is returned. If the resource does not + * exist, the response is empty. + *

+ * + * @param request request to claim a resource in the shared cache + * @return response indicating if the resource is already in the cache + * @throws YarnException + * @throws IOException + */ + public UseSharedCacheResourceResponse use( + UseSharedCacheResourceRequest request) throws YarnException, IOException; + + /** + *

+ * The interface used by clients to release a resource with the + * SharedCacheManager. This method is called once an application + * is no longer using a claimed resource in the shared cache. The client uses + * a checksum to identify the resource and an {@link ApplicationId} to + * identify which application is releasing the resource. + *

+ * + *

+ * Note: This method is an optimization and the client is not required to call + * it for correctness. + *

+ * + *

+ * Currently the SharedCacheManager sends an empty response. + *

+ * + * @param request request to release a resource in the shared cache + * @return (empty) response on releasing the resource + * @throws YarnException + * @throws IOException + */ + public ReleaseSharedCacheResourceResponse release( + ReleaseSharedCacheResourceRequest request) throws YarnException, IOException; + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java new file mode 100644 index 0000000..b0a9fb5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api; + +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.ClientSCMProtocol.ClientSCMProtocolService; + +@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ClientSCMProtocolPB", + protocolVersion = 1) +public interface ClientSCMProtocolPB extends + ClientSCMProtocolService.BlockingInterface { + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java new file mode 100644 index 0000000..993451b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + *

The request from clients to release a resource in the shared cache.

+ */ +@Public +@Stable +public abstract class ReleaseSharedCacheResourceRequest { + + /** + * Get the ApplicationId of the resource to be released. + * + * @return ApplicationId + */ + @Public + @Stable + public abstract ApplicationId getAppId(); + + /** + * Set the ApplicationId of the resource to be released. + * + * @param id ApplicationId + */ + @Public + @Stable + public abstract void setAppId(ApplicationId id); + + /** + * Get the key of the resource to be released. + * + * @return key + */ + @Public + @Stable + public abstract String getResourceKey(); + + /** + * Set the key of the resource to be released. + * + * @param key unique identifier for the resource + */ + @Public + @Stable + public abstract void setResourceKey(String key); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java new file mode 100644 index 0000000..c36f53d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; + +/** + *

+ * The response to clients from the SharedCacheManager when + * releasing a resource in the shared cache. + *

+ * + *

+ * Currently, this is empty. + *

+ */ +@Public +@Stable +public abstract class ReleaseSharedCacheResourceResponse { +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java new file mode 100644 index 0000000..a0869fe --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + *

+ * The request from clients to the SharedCacheManager that claims a + * resource in the shared cache. + *

+ */ +@Public +@Stable +public abstract class UseSharedCacheResourceRequest { + + /** + * Get the ApplicationId of the resource to be used. + * + * @return ApplicationId + */ + @Public + @Stable + public abstract ApplicationId getAppId(); + + /** + * Set the ApplicationId of the resource to be used. + * + * @param id ApplicationId + */ + @Public + @Stable + public abstract void setAppId(ApplicationId id); + + /** + * Get the key of the resource to be used. + * + * @return key + */ + @Public + @Stable + public abstract String getResourceKey(); + + /** + * Set the key of the resource to be used. + * + * @param key unique identifier for the resource + */ + @Public + @Stable + public abstract void setResourceKey(String key); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java new file mode 100644 index 0000000..35d1a70 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; + +/** + *

+ * The response from the SharedCacheManager to the client that indicates whether + * a requested resource exists in the cache. + *

+ */ +@Public +@Stable +public abstract class UseSharedCacheResourceResponse { + + /** + * Get the Path corresponding to the requested resource in the + * shared cache. + * + * @return String A Path if the resource exists in the shared + * cache, null otherwise + */ + @Public + @Stable + public abstract String getPath(); + + /** + * Set the Path corresponding to a resource in the shared cache. + * + * @param p A Path corresponding to a resource in the shared + * cache + */ + @Public + @Stable + public abstract void setPath(String p); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d9a1918..593bd8b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1275,6 +1275,17 @@ + "nodemanager.thread-count"; public static final int DEFAULT_SCM_NM_THREAD_COUNT = 50; + /** The address of the client interface in the SCM. */ + public static final String SCM_ADDRESS = SCM_PREFIX + "client.address"; + public static final int DEFAULT_SCM_PORT = 8045; + public static final String DEFAULT_SCM_ADDRESS = "0.0.0.0:" + + DEFAULT_SCM_PORT; + + /** The number of threads used to handle shared cache manager requests. */ + public static final String SCM_CLIENT_THREAD_COUNT = SCM_PREFIX + + "client.thread-count"; + public static final int DEFAULT_SCM_CLIENT_THREAD_COUNT = 50; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto new file mode 100644 index 0000000..fbc3c42 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "ClientSCMProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_service_protos.proto"; + +service ClientSCMProtocolService { + rpc use (UseSharedCacheResourceRequestProto) returns (UseSharedCacheResourceResponseProto); + rpc release (ReleaseSharedCacheResourceRequestProto) returns (ReleaseSharedCacheResourceResponseProto); +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a1f6d2e..4372cdb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -285,3 +285,24 @@ message GetContainersRequestProto { message GetContainersResponseProto { repeated ContainerReportProto containers = 1; } + +////////////////////////////////////////////////////// +/////// client_SCM_Protocol ////////////////////////// +////////////////////////////////////////////////////// + +message UseSharedCacheResourceRequestProto { + optional ApplicationIdProto applicationId = 1; + optional string resourceKey = 2; +} + +message UseSharedCacheResourceResponseProto { + optional string path = 1; +} + +message ReleaseSharedCacheResourceRequestProto { + optional ApplicationIdProto applicationId = 1; + optional string resourceKey = 2; +} + +message ReleaseSharedCacheResourceResponseProto { +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java new file mode 100644 index 0000000..58af3c4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.impl.pb.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.ClientSCMProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto; + +import com.google.protobuf.ServiceException; + +public class ClientSCMProtocolPBClientImpl implements ClientSCMProtocol, + Closeable { + + private ClientSCMProtocolPB proxy; + + public ClientSCMProtocolPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, ClientSCMProtocolPB.class, + ProtobufRpcEngine.class); + proxy = RPC.getProxy(ClientSCMProtocolPB.class, clientVersion, addr, conf); + } + + @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + + @Override + public UseSharedCacheResourceResponse use( + UseSharedCacheResourceRequest request) throws YarnException, IOException { + UseSharedCacheResourceRequestProto requestProto = + ((UseSharedCacheResourceRequestPBImpl) request).getProto(); + try { + return new UseSharedCacheResourceResponsePBImpl(proxy.use(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public ReleaseSharedCacheResourceResponse release( + ReleaseSharedCacheResourceRequest request) throws YarnException, + IOException { + ReleaseSharedCacheResourceRequestProto requestProto = + ((ReleaseSharedCacheResourceRequestPBImpl) request).getProto(); + try { + return new ReleaseSharedCacheResourceResponsePBImpl(proxy.release(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java new file mode 100644 index 0000000..8c4e549 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.impl.pb.service; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.ClientSCMProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProto; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +public class ClientSCMProtocolPBServiceImpl implements ClientSCMProtocolPB { + + private ClientSCMProtocol real; + + public ClientSCMProtocolPBServiceImpl(ClientSCMProtocol impl) { + this.real = impl; + } + + @Override + public UseSharedCacheResourceResponseProto use(RpcController controller, + UseSharedCacheResourceRequestProto proto) throws ServiceException { + UseSharedCacheResourceRequestPBImpl request = + new UseSharedCacheResourceRequestPBImpl(proto); + try { + UseSharedCacheResourceResponse response = real.use(request); + return ((UseSharedCacheResourceResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ReleaseSharedCacheResourceResponseProto release( + RpcController controller, ReleaseSharedCacheResourceRequestProto proto) + throws ServiceException { + ReleaseSharedCacheResourceRequestPBImpl request = + new ReleaseSharedCacheResourceRequestPBImpl(proto); + try { + ReleaseSharedCacheResourceResponse response = real.release(request); + return ((ReleaseSharedCacheResourceResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java new file mode 100644 index 0000000..6db61a1 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProtoOrBuilder; + +public class ReleaseSharedCacheResourceRequestPBImpl extends + ReleaseSharedCacheResourceRequest { + ReleaseSharedCacheResourceRequestProto proto = + ReleaseSharedCacheResourceRequestProto.getDefaultInstance(); + ReleaseSharedCacheResourceRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + + public ReleaseSharedCacheResourceRequestPBImpl() { + builder = ReleaseSharedCacheResourceRequestProto.newBuilder(); + } + + public ReleaseSharedCacheResourceRequestPBImpl( + ReleaseSharedCacheResourceRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public ReleaseSharedCacheResourceRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public ApplicationId getAppId() { + ReleaseSharedCacheResourceRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } + if (!p.hasApplicationId()) { + return null; + } + this.applicationId = convertFromProtoFormat(p.getApplicationId()); + return this.applicationId; + } + + @Override + public void setAppId(ApplicationId id) { + maybeInitBuilder(); + if (id == null) + builder.clearApplicationId(); + this.applicationId = id; + } + + @Override + public String getResourceKey() { + ReleaseSharedCacheResourceRequestProtoOrBuilder p = + viaProto ? proto : builder; + return (p.hasResourceKey()) ? p.getResourceKey() : null; + } + + @Override + public void setResourceKey(String key) { + maybeInitBuilder(); + if (key == null) { + builder.clearResourceKey(); + return; + } + builder.setResourceKey(key); + } + + private void mergeLocalToBuilder() { + if (applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ReleaseSharedCacheResourceRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java new file mode 100644 index 0000000..559f2c8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceResponseProto; + +public class ReleaseSharedCacheResourceResponsePBImpl extends + ReleaseSharedCacheResourceResponse { + ReleaseSharedCacheResourceResponseProto proto = + ReleaseSharedCacheResourceResponseProto.getDefaultInstance(); + ReleaseSharedCacheResourceResponseProto.Builder builder = null; + boolean viaProto = false; + + public ReleaseSharedCacheResourceResponsePBImpl() { + builder = ReleaseSharedCacheResourceResponseProto.newBuilder(); + } + + public ReleaseSharedCacheResourceResponsePBImpl( + ReleaseSharedCacheResourceResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public ReleaseSharedCacheResourceResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ReleaseSharedCacheResourceResponseProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java new file mode 100644 index 0000000..8499b9f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProtoOrBuilder; + +public class UseSharedCacheResourceRequestPBImpl extends + UseSharedCacheResourceRequest { + UseSharedCacheResourceRequestProto proto = UseSharedCacheResourceRequestProto + .getDefaultInstance(); + UseSharedCacheResourceRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + + public UseSharedCacheResourceRequestPBImpl() { + builder = UseSharedCacheResourceRequestProto.newBuilder(); + } + + public UseSharedCacheResourceRequestPBImpl( + UseSharedCacheResourceRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public UseSharedCacheResourceRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public ApplicationId getAppId() { + UseSharedCacheResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } + if (!p.hasApplicationId()) { + return null; + } + this.applicationId = convertFromProtoFormat(p.getApplicationId()); + return this.applicationId; + } + + @Override + public void setAppId(ApplicationId id) { + maybeInitBuilder(); + if (id == null) + builder.clearApplicationId(); + this.applicationId = id; + } + + @Override + public String getResourceKey() { + UseSharedCacheResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasResourceKey()) ? p.getResourceKey() : null; + } + + @Override + public void setResourceKey(String key) { + maybeInitBuilder(); + if (key == null) { + builder.clearResourceKey(); + return; + } + builder.setResourceKey(key); + } + + private void mergeLocalToBuilder() { + if (applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UseSharedCacheResourceRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java new file mode 100644 index 0000000..5a9c14b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProtoOrBuilder; + +public class UseSharedCacheResourceResponsePBImpl extends + UseSharedCacheResourceResponse { + UseSharedCacheResourceResponseProto proto = + UseSharedCacheResourceResponseProto + .getDefaultInstance(); + UseSharedCacheResourceResponseProto.Builder builder = null; + boolean viaProto = false; + + public UseSharedCacheResourceResponsePBImpl() { + builder = UseSharedCacheResourceResponseProto.newBuilder(); + } + + public UseSharedCacheResourceResponsePBImpl( + UseSharedCacheResourceResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public UseSharedCacheResourceResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public String getPath() { + UseSharedCacheResourceResponseProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasPath()) ? p.getPath() : null; + } + + @Override + public void setPath(String path) { + maybeInitBuilder(); + if (path == null) { + builder.clearPath(); + return; + } + builder.setPath(path); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UseSharedCacheResourceResponseProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 9d2bce3..6de12e8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1328,6 +1328,18 @@ 50 + + The address of the client interface in the SCM (shared cache manager) + yarn.sharedcache.manager.client.address + 0.0.0.0:8045 + + + + The number of threads used to handle shared cache manager requests from clients (50 by default) + yarn.sharedcache.manager.client.thread-count + 50 + + The interval that the yarn client library uses to poll the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java new file mode 100644 index 0000000..c48d56d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.sharedcache.CacheStructureUtil; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.ClientSCMMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.ResourceReference; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; + +/** + * This service handles all rpc calls from the client to the shared cache + * manager. + */ +public class ClientProtocolService extends AbstractService implements + ClientSCMProtocol { + + private static final Log LOG = LogFactory.getLog(ClientProtocolService.class); + + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private Server server; + InetSocketAddress clientBindAddress; + private final SCMStore store; + private int cacheDepth; + private String cacheRoot; + private ClientSCMMetrics metrics; + + public ClientProtocolService(SCMStore store) { + super(ClientProtocolService.class.getName()); + this.store = store; + } + + @Override + protected synchronized void serviceInit(Configuration conf) throws Exception { + this.clientBindAddress = getBindAddress(conf); + + this.cacheDepth = CacheStructureUtil.getCacheDepth(conf); + + this.cacheRoot = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + + super.serviceInit(conf); + } + + InetSocketAddress getBindAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.SCM_ADDRESS, + YarnConfiguration.DEFAULT_SCM_ADDRESS, + YarnConfiguration.DEFAULT_SCM_PORT); + } + + @Override + protected synchronized void serviceStart() throws Exception { + Configuration conf = getConfig(); + this.metrics = ClientSCMMetrics.initSingleton(conf); + + YarnRPC rpc = YarnRPC.create(conf); + this.server = + rpc.getServer(ClientSCMProtocol.class, this, + clientBindAddress, + conf, null, // Secret manager null for now (security not supported) + conf.getInt(YarnConfiguration.SCM_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_SCM_CLIENT_THREAD_COUNT)); + + // TODO: Enable service authorization + + this.server.start(); + clientBindAddress = + conf.updateConnectAddr(YarnConfiguration.SCM_ADDRESS, + server.getListenerAddress()); + + super.serviceStart(); + } + + @Override + protected synchronized void serviceStop() throws Exception { + if (this.server != null) { + this.server.stop(); + } + + super.serviceStop(); + } + + @Override + public UseSharedCacheResourceResponse use( + UseSharedCacheResourceRequest request) throws YarnException, + IOException { + + UseSharedCacheResourceResponse response = + recordFactory.newRecordInstance(UseSharedCacheResourceResponse.class); + + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + throw RPCUtil.getRemoteException(ie); + } + + String fileName = + this.store.addResourceReference(request.getResourceKey(), + request.getAppId(), callerUGI.getShortUserName()); + + if (fileName != null) { + response + .setPath(getCacheEntryFilePath(request.getResourceKey(), fileName)); + this.metrics.incCacheHitCount(); + } else { + this.metrics.incCacheMissCount(); + } + + return response; + } + + @Override + public ReleaseSharedCacheResourceResponse release( + ReleaseSharedCacheResourceRequest request) throws YarnException, + IOException { + + ReleaseSharedCacheResourceResponse response = + recordFactory + .newRecordInstance(ReleaseSharedCacheResourceResponse.class); + + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + throw RPCUtil.getRemoteException(ie); + } + + boolean removed = + this.store.removeResourceReference( + request.getResourceKey(), + new ResourceReference(request.getAppId(), callerUGI + .getShortUserName()), true); + + if (removed) { + this.metrics.incCacheRelease(); + } + + return response; + } + + private String getCacheEntryFilePath(String checksum, String filename) { + return CacheStructureUtil.getCacheEntryPath(this.cacheDepth, + this.cacheRoot, checksum) + Path.SEPARATOR_CHAR + filename; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java index 6ebb3c8..42468af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -84,6 +84,9 @@ protected synchronized void serviceInit(Configuration conf) throws Exception { createNMCacheUploaderSCMProtocolService(store); addService(nms); + ClientProtocolService cps = createClientProtocolService(store); + addService(cps); + } catch (IOException e) { LOG.error("Encountered unexpected exception while initializing the shared cache manager", e); @@ -201,6 +204,10 @@ private NMCacheUploaderSCMProtocolService createNMCacheUploaderSCMProtocolServic return new NMCacheUploaderSCMProtocolService(store); } + private ClientProtocolService createClientProtocolService(SCMStore store) { + return new ClientProtocolService(store); + } + @Override protected synchronized void serviceStart() throws Exception { // Start metrics diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java new file mode 100644 index 0000000..5707b0c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +/** + * This class is for maintaining client requests metrics + * and publishing them through the metrics interfaces. + */ +@InterfaceAudience.Private +@Metrics(about="Client SCM metrics", context="yarn") +public class ClientSCMMetrics { + + static final Log LOG = LogFactory.getLog(ClientSCMMetrics.class); + final MetricsRegistry registry; + + ClientSCMMetrics() { + registry = new MetricsRegistry("clientRequests"); + LOG.debug("Initialized "+ registry); + } + + enum Singleton { + INSTANCE; + + ClientSCMMetrics impl; + + synchronized ClientSCMMetrics init(Configuration conf) { + if (impl == null) { + impl = create(); + } + return impl; + } + } + + public static ClientSCMMetrics initSingleton(Configuration conf) { + return Singleton.INSTANCE.init(conf); + } + + public static ClientSCMMetrics getInstance() { + ClientSCMMetrics topMetrics = Singleton.INSTANCE.impl; + if (topMetrics == null) + throw new IllegalStateException( + "The ClientSCMMetrics singleton instance is not initialized." + + " Have you called init first?"); + return topMetrics; + } + + static ClientSCMMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + + ClientSCMMetrics metrics = new ClientSCMMetrics(); + ms.register("clientRequests", null, metrics); + return metrics; + } + + @Metric("Number of cache hits") MutableCounterLong cacheHits; + @Metric("Number of cache misses") MutableCounterLong cacheMisses; + @Metric("Number of cache releases") MutableCounterLong cacheReleases; + + /** + * One cache hit event + */ + public void incCacheHitCount() { + cacheHits.incr(); + } + + /** + * One cache miss event + */ + public void incCacheMissCount() { + cacheMisses.incr(); + } + + /** + * One cache release event + */ + public void incCacheRelease() { + cacheReleases.incr(); + } + + public long getCacheHits() { return cacheHits.value(); } + public long getCacheMisses() { return cacheMisses.value(); } + public long getCacheReleases() { return cacheReleases.value(); } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java new file mode 100644 index 0000000..9a4fdca --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.net.InetSocketAddress; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.ClientSCMMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + + +/** + * Basic unit tests for the Client to SCM Protocol Service. + */ +public class TestClientSCMProtocolService { + + static ClientProtocolService service; + static ClientSCMProtocol clientSCMProxy; + static SCMStore store; + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + @BeforeClass + public static void startUp() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.SCM_STORE_IMPL, InMemorySCMStore.class.getName()); + store = new InMemorySCMStore(new SCMContext()); + store.init(conf); + store.start(); + + service = new ClientProtocolService(store); + service.init(conf); + service.start(); + + YarnRPC rpc = YarnRPC.create(new Configuration()); + + InetSocketAddress scmAddress = + conf.getSocketAddr(YarnConfiguration.SCM_ADDRESS, + YarnConfiguration.DEFAULT_SCM_ADDRESS, + YarnConfiguration.DEFAULT_SCM_PORT); + + clientSCMProxy = + (ClientSCMProtocol) rpc.getProxy(ClientSCMProtocol.class, scmAddress, + conf); + } + + @AfterClass + public static void cleanUp() { + if (store != null) { + store.stop(); + } + + if (service != null) { + service.stop(); + } + + if (clientSCMProxy != null) { + RPC.stopProxy(clientSCMProxy); + } + } + + @After + public void cleanUpTest() { + store.clearCache(); + } + + @Test + public void testUse_MissingEntry() throws Exception { + long misses = ClientSCMMetrics.getInstance().getCacheMisses(); + UseSharedCacheResourceRequest request = + recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(1, 1L)); + Assert.assertNull(clientSCMProxy.use(request).getPath()); + Assert.assertEquals( + "Client SCM metrics aren't updated.", 1, + ClientSCMMetrics.getInstance().getCacheMisses() - misses); + } + + @Test + public void testUse_ExistingEntry_NoAppIds() throws Exception { + // Pre-populate the SCM with one cache entry + store.addKey("key1", "foo.jar"); + + long hits = ClientSCMMetrics.getInstance().getCacheHits(); + + UseSharedCacheResourceRequest request = + recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(2, 2L)); + // Expecting default depth of 3 and rootdir of /sharedcache + String expectedPath = "/sharedcache/k/e/y/key1/foo.jar"; + Assert.assertEquals(expectedPath, clientSCMProxy.use(request).getPath()); + Assert.assertEquals(1, store.getResourceReferences("key1").size()); + Assert.assertEquals( + "Client SCM metrics aren't updated.", 1, + ClientSCMMetrics.getInstance().getCacheHits() - hits); + + } + + @Test + public void testUse_ExistingEntry_OneId() throws Exception { + // Pre-populate the SCM with one cache entry + store.addKey("key1", "foo.jar"); + store.addResourceReference("key1", createAppId(1, 1L), "user"); + Assert.assertEquals(1, store.getResourceReferences("key1").size()); + long hits = ClientSCMMetrics.getInstance().getCacheHits(); + + // Add a new distinct appId + UseSharedCacheResourceRequest request = + recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(2, 2L)); + + // Expecting default depth of 3 and rootdir of /sharedcache + String expectedPath = "/sharedcache/k/e/y/key1/foo.jar"; + Assert.assertEquals(expectedPath, clientSCMProxy.use(request).getPath()); + Assert.assertEquals(2, store.getResourceReferences("key1").size()); + Assert.assertEquals( + "Client SCM metrics aren't updated.", + 1, ClientSCMMetrics.getInstance().getCacheHits() - hits); + } + + @Test + public void testUse_ExistingEntry_DupId() throws Exception { + // Pre-populate the SCM with one cache entry + store.addKey("key1", "foo.jar"); + UserGroupInformation testUGI = UserGroupInformation.getCurrentUser(); + store.addResourceReference("key1", createAppId(1, 1L), + testUGI.getShortUserName()); + Assert.assertEquals(1, store.getResourceReferences("key1").size()); + + long hits = ClientSCMMetrics.getInstance().getCacheHits(); + + // Add a new duplicate appId + UseSharedCacheResourceRequest request = + recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(1, 1L)); + + // Expecting default depth of 3 and rootdir of /sharedcache + String expectedPath = "/sharedcache/k/e/y/key1/foo.jar"; + Assert.assertEquals(expectedPath, clientSCMProxy.use(request).getPath()); + Assert.assertEquals(1, store.getResourceReferences("key1").size()); + + Assert.assertEquals( + "Client SCM metrics aren't updated.", + 1, ClientSCMMetrics.getInstance().getCacheHits() - hits); + } + + @Test + public void testRelease_ExistingEntry_NonExistantAppId() throws Exception { + // Pre-populate the SCM with one cache entry + store.addKey("key1", "foo.jar"); + store.addResourceReference("key1", createAppId(1, 1L), "user"); + Assert.assertEquals(1, store.getResourceReferences("key1").size()); + + long releases = ClientSCMMetrics.getInstance().getCacheReleases(); + + ReleaseSharedCacheResourceRequest request = + recordFactory + .newRecordInstance(ReleaseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(2, 2L)); + clientSCMProxy.release(request); + Assert.assertEquals(1, store.getResourceReferences("key1").size()); + + Assert.assertEquals( + "Client SCM metrics were updated when a release did not happen", 0, + ClientSCMMetrics.getInstance().getCacheReleases() - releases); + + } + + @Test + public void testRelease_ExistingEntry_WithAppId() throws Exception { + // Pre-populate the SCM with one cache entry + store.addKey("key1", "foo.jar"); + UserGroupInformation testUGI = UserGroupInformation.getCurrentUser(); + store.addResourceReference("key1", createAppId(1, 1L), + testUGI.getShortUserName()); + Assert.assertEquals(1, store.getResourceReferences("key1").size()); + + long releases = ClientSCMMetrics.getInstance().getCacheReleases(); + + ReleaseSharedCacheResourceRequest request = + recordFactory + .newRecordInstance(ReleaseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(1, 1L)); + clientSCMProxy.release(request); + Assert.assertEquals(0, store.getResourceReferences("key1").size()); + + Assert.assertEquals( + "Client SCM metrics aren't updated.", + 1, ClientSCMMetrics.getInstance().getCacheReleases() - releases); + + } + + @Test + public void testRelease_MissingEntry() throws Exception { + + long releases = ClientSCMMetrics.getInstance().getCacheReleases(); + + ReleaseSharedCacheResourceRequest request = + recordFactory + .newRecordInstance(ReleaseSharedCacheResourceRequest.class); + request.setResourceKey("key2"); + request.setAppId(createAppId(2, 2L)); + clientSCMProxy.release(request); + Assert.assertNotNull(store.getResourceReferences("key2")); + Assert.assertEquals(0, store.getResourceReferences("key2").size()); + Assert.assertEquals( + "Client SCM metrics were updated when a release did not happen.", + 0, ClientSCMMetrics.getInstance().getCacheReleases() - releases); + } + + private ApplicationId createAppId(int id, long timestamp) { + return ApplicationId.newInstance(timestamp, id); + } +}