diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d552e9c..a8672ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -1442,6 +1441,21 @@ private static void addDeprecatedKeys() { SCM_CLEANER_PREFIX + "resource-sleep-ms"; public static final long DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS = 0L; + /** The address of the node manager interface in the SCM. */ + public static final String NM_SCM_ADDRESS = SHARED_CACHE_PREFIX + + "nodemanager.address"; + public static final int DEFAULT_NM_SCM_PORT = 8046; + public static final String DEFAULT_NM_SCM_ADDRESS = "0.0.0.0:" + + DEFAULT_NM_SCM_PORT; + + /** + * The number of SCM threads used to handle notify requests from the node + * manager. + */ + public static final String SCM_NM_THREAD_COUNT = SHARED_CACHE_PREFIX + + "nodemanager.thread-count"; + public static final int DEFAULT_SCM_NM_THREAD_COUNT = 50; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e364ba8..12d4660 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1419,6 +1419,20 @@ 0 + + The address of the node manager interface in the SCM + (shared cache manager) + yarn.sharedcache.nodemanager.address + 0.0.0.0:8046 + + + + The number of threads used to handle shared cache manager + requests from the node manager (50 by default) + yarn.sharedcache.nodemanager.thread-count + 50 + + The interval that the yarn client library uses to poll the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index acf330f..ca01d31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -135,6 +135,7 @@ yarn_server_common_service_protos.proto yarn_server_common_service_protos.proto ResourceTracker.proto + NMCacheUploader_SCM_protocol.proto ${project.build.directory}/generated-sources/java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/NMCacheUploaderSCMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/NMCacheUploaderSCMProtocol.java new file mode 100644 index 0000000..0c5a353 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/NMCacheUploaderSCMProtocol.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMResponse; + +/** + *

+ * The protocol between a NodeManager's + * SharedCacheUploadService and the + * SharedCacheManager. + *

+ */ +@Private +@Unstable +public interface NMCacheUploaderSCMProtocol { + /** + *

+ * The method used by the NodeManager's SharedCacheUploadService + * to notify the shared cache manager of a newly cached resource. + *

+ * + *

+ * The SharedCacheManager responds with whether or not the + * NodeManager should delete the uploaded file. + *

+ * + * @param request notify the shared cache manager of a newly uploaded resource + * to the shared cache + * @return response indicating if the newly uploaded resource should be + * deleted + * @throws YarnException + * @throws IOException + */ + public NotifySCMResponse notify(NotifySCMRequest request) + throws YarnException, IOException; + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/NMCacheUploaderSCMProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/NMCacheUploaderSCMProtocolPB.java new file mode 100644 index 0000000..13e055b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/NMCacheUploaderSCMProtocolPB.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.NMCacheUploaderSCMProtocol.NMCacheUploaderSCMProtocolService; + +@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocolPB", + protocolVersion = 1) +public interface NMCacheUploaderSCMProtocolPB extends + NMCacheUploaderSCMProtocolService.BlockingInterface { + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/NMCacheUploaderSCMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/NMCacheUploaderSCMProtocolPBClientImpl.java new file mode 100644 index 0000000..1cebf6d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/NMCacheUploaderSCMProtocolPBClientImpl.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.impl.pb.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NotifySCMRequestProto; +import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocol; +import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NotifySCMRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NotifySCMResponsePBImpl; + +import com.google.protobuf.ServiceException; + +public class NMCacheUploaderSCMProtocolPBClientImpl implements + NMCacheUploaderSCMProtocol, Closeable { + + private NMCacheUploaderSCMProtocolPB proxy; + + public NMCacheUploaderSCMProtocolPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, NMCacheUploaderSCMProtocolPB.class, + ProtobufRpcEngine.class); + proxy = + RPC.getProxy(NMCacheUploaderSCMProtocolPB.class, clientVersion, addr, + conf); + } + + @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + + @Override + public NotifySCMResponse notify(NotifySCMRequest request) + throws YarnException, IOException { + NotifySCMRequestProto requestProto = + ((NotifySCMRequestPBImpl) request).getProto(); + try { + return new NotifySCMResponsePBImpl(proxy.notify(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/NMCacheUploaderSCMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/NMCacheUploaderSCMProtocolPBServiceImpl.java new file mode 100644 index 0000000..f178079 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/NMCacheUploaderSCMProtocolPBServiceImpl.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.impl.pb.service; + +import java.io.IOException; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NotifySCMRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NotifySCMResponseProto; +import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocol; +import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NotifySCMRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NotifySCMResponsePBImpl; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +public class NMCacheUploaderSCMProtocolPBServiceImpl implements + NMCacheUploaderSCMProtocolPB { + + private NMCacheUploaderSCMProtocol real; + + public NMCacheUploaderSCMProtocolPBServiceImpl(NMCacheUploaderSCMProtocol impl) { + this.real = impl; + } + + @Override + public NotifySCMResponseProto notify(RpcController controller, + NotifySCMRequestProto proto) throws ServiceException { + NotifySCMRequestPBImpl request = new NotifySCMRequestPBImpl(proto); + try { + NotifySCMResponse response = real.notify(request); + return ((NotifySCMResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NotifySCMRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NotifySCMRequest.java new file mode 100644 index 0000000..f3027c0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NotifySCMRequest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

+ * The request from clients to the SharedCacheManager that claims a + * resource in the shared cache. + *

+ */ +@Private +@Unstable +public abstract class NotifySCMRequest { + + /** + * Get the filename of the resource that was just uploaded to the shared + * cache. + * + * @return the filename + */ + public abstract String getFileName(); + + /** + * Set the filename of the resource that was just uploaded to the shared + * cache. + * + * @param filename the name of the file + */ + public abstract void setFilename(String filename); + + /** + * Get the key of the resource that was just uploaded to the + * shared cache. + * + * @return key + */ + public abstract String getResourceKey(); + + /** + * Set the key of the resource that was just uploaded to the + * shared cache. + * + * @param key unique identifier for the resource + */ + public abstract void setResourceKey(String key); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NotifySCMResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NotifySCMResponse.java new file mode 100644 index 0000000..c7c44f5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NotifySCMResponse.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

+ * The response from the SharedCacheManager to the NodeManager that indicates + * whether the NodeManager needs to delete the cached resource it was sending + * the notification for. + *

+ */ +@Private +@Unstable +public abstract class NotifySCMResponse { + + /** + * Get whether or not the shared cache manager has accepted the notified + * resource (i.e. the uploaded file should remain in the cache). + * + * @return boolean True if the resource has been accepted, false otherwise. + */ + public abstract boolean getAccepted(); + + /** + * Set whether or not the shared cache manager has accepted the notified + * resource (i.e. the uploaded file should remain in the cache). + * + * @param b True if the resource has been accepted, false otherwise. + */ + public abstract void setAccepted(boolean b); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NotifySCMRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NotifySCMRequestPBImpl.java new file mode 100644 index 0000000..1891d5e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NotifySCMRequestPBImpl.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NotifySCMRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NotifySCMRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMRequest; + +public class NotifySCMRequestPBImpl extends + NotifySCMRequest { + NotifySCMRequestProto proto = NotifySCMRequestProto + .getDefaultInstance(); + NotifySCMRequestProto.Builder builder = null; + boolean viaProto = false; + + public NotifySCMRequestPBImpl() { + builder = NotifySCMRequestProto.newBuilder(); + } + + public NotifySCMRequestPBImpl(NotifySCMRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public NotifySCMRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public String getResourceKey() { + NotifySCMRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasResourceKey()) ? p.getResourceKey() : null; + } + + @Override + public void setResourceKey(String key) { + maybeInitBuilder(); + if (key == null) { + builder.clearResourceKey(); + return; + } + builder.setResourceKey(key); + } + + @Override + public String getFileName() { + NotifySCMRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasFilename()) ? p.getFilename() : null; + } + + @Override + public void setFilename(String filename) { + maybeInitBuilder(); + if (filename == null) { + builder.clearFilename(); + return; + } + builder.setFilename(filename); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = NotifySCMRequestProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NotifySCMResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NotifySCMResponsePBImpl.java new file mode 100644 index 0000000..3fc4836 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NotifySCMResponsePBImpl.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NotifySCMResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NotifySCMResponseProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMResponse; + +public class NotifySCMResponsePBImpl extends + NotifySCMResponse { + NotifySCMResponseProto proto = NotifySCMResponseProto + .getDefaultInstance(); + NotifySCMResponseProto.Builder builder = null; + boolean viaProto = false; + + public NotifySCMResponsePBImpl() { + builder = NotifySCMResponseProto.newBuilder(); + } + + public NotifySCMResponsePBImpl( +NotifySCMResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public NotifySCMResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public boolean getAccepted() { + NotifySCMResponseProtoOrBuilder p = viaProto ? proto : builder; + // Default to true, when in doubt just leave the file in the cache + return (p.hasAccepted()) ? p.getAccepted() : true; + } + + @Override + public void setAccepted(boolean b) { + maybeInitBuilder(); + builder.setAccepted(b); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = NotifySCMResponseProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/NMCacheUploader_SCM_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/NMCacheUploader_SCM_protocol.proto new file mode 100644 index 0000000..9e4c33d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/NMCacheUploader_SCM_protocol.proto @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "NMCacheUploaderSCMProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_server_common_service_protos.proto"; + +service NMCacheUploaderSCMProtocolService { + rpc notify(NotifySCMRequestProto) returns (NotifySCMResponseProto); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index d0990fb..baace1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -69,4 +69,13 @@ message NMContainerStatusProto { optional string diagnostics = 5 [default = "N/A"]; optional int32 container_exit_status = 6; optional int64 creation_time = 7; -} \ No newline at end of file +} + +message NotifySCMRequestProto { + optional string resource_key = 1; + optional string filename = 2; +} + +message NotifySCMResponseProto { + optional bool accepted = 1; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/NMCacheUploaderSCMProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/NMCacheUploaderSCMProtocolService.java new file mode 100644 index 0000000..2ad4389 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/NMCacheUploaderSCMProtocolService.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMResponse; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.NMCacheUploaderSCMProtocolMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; + +/** + * This service handles all rpc calls from the NodeManager uploader to the + * shared cache manager. + */ +public class NMCacheUploaderSCMProtocolService extends AbstractService + implements NMCacheUploaderSCMProtocol { + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private Server server; + InetSocketAddress bindAddress; + private final SCMStore store; + private NMCacheUploaderSCMProtocolMetrics metrics; + + public NMCacheUploaderSCMProtocolService(SCMStore store) { + super(NMCacheUploaderSCMProtocolService.class.getName()); + this.store = store; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.bindAddress = getBindAddress(conf); + + super.serviceInit(conf); + } + + InetSocketAddress getBindAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.NM_SCM_ADDRESS, + YarnConfiguration.DEFAULT_NM_SCM_ADDRESS, + YarnConfiguration.DEFAULT_NM_SCM_PORT); + } + + @Override + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + this.metrics = NMCacheUploaderSCMProtocolMetrics.initSingleton(conf); + + YarnRPC rpc = YarnRPC.create(conf); + this.server = + rpc.getServer(NMCacheUploaderSCMProtocol.class, this, bindAddress, + conf, null, // Secret manager null for now (security not supported) + conf.getInt(YarnConfiguration.SCM_NM_THREAD_COUNT, + YarnConfiguration.DEFAULT_SCM_NM_THREAD_COUNT)); + + // TODO: Enable service authorization + + this.server.start(); + bindAddress = + conf.updateConnectAddr(YarnConfiguration.NM_SCM_ADDRESS, + server.getListenerAddress()); + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.server != null) { + this.server.stop(); + } + + super.serviceStop(); + } + + @Override + public NotifySCMResponse notify(NotifySCMRequest request) + throws YarnException, IOException { + NotifySCMResponse response = + recordFactory.newRecordInstance(NotifySCMResponse.class); + + // TODO: Security/authorization + + String filename = + store.addResource(request.getResourceKey(), request.getFileName()); + + boolean accepted = filename.equals(request.getFileName()); + + if (accepted) { + this.metrics.incAcceptedUploads(); + } else { + this.metrics.incRejectedUploads(); + } + + response.setAccepted(accepted); + + return response; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java index 3fdb588..4d65b6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -67,6 +67,10 @@ protected void serviceInit(Configuration conf) throws Exception { CleanerService cs = createCleanerService(store); addService(cs); + NMCacheUploaderSCMProtocolService nms = + createNMCacheUploaderSCMProtocolService(store); + addService(nms); + // init metrics DefaultMetricsSystem.initialize("SharedCacheManager"); JvmMetrics.initSingleton("SharedCacheManager", null); @@ -97,6 +101,11 @@ private CleanerService createCleanerService(SCMStore store) { return new CleanerService(store); } + private NMCacheUploaderSCMProtocolService + createNMCacheUploaderSCMProtocolService(SCMStore store) { + return new NMCacheUploaderSCMProtocolService(store); + } + @Override protected void serviceStop() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/NMCacheUploaderSCMProtocolMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/NMCacheUploaderSCMProtocolMetrics.java new file mode 100644 index 0000000..c1f03ec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/NMCacheUploaderSCMProtocolMetrics.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +/** + * This class is for maintaining NM uploader requests metrics + * and publishing them through the metrics interfaces. + */ +@Private +@Evolving +@Metrics(about="NM cache upload metrics", context="yarn") +public class NMCacheUploaderSCMProtocolMetrics { + + static final Log LOG = + LogFactory.getLog(NMCacheUploaderSCMProtocolMetrics.class); + final MetricsRegistry registry; + + NMCacheUploaderSCMProtocolMetrics() { + registry = new MetricsRegistry("NMUploadRequests"); + LOG.debug("Initialized "+ registry); + } + + enum Singleton { + INSTANCE; + + NMCacheUploaderSCMProtocolMetrics impl; + + synchronized NMCacheUploaderSCMProtocolMetrics init(Configuration conf) { + if (impl == null) { + impl = create(); + } + return impl; + } + } + + public static NMCacheUploaderSCMProtocolMetrics + initSingleton(Configuration conf) { + return Singleton.INSTANCE.init(conf); + } + + public static NMCacheUploaderSCMProtocolMetrics getInstance() { + NMCacheUploaderSCMProtocolMetrics topMetrics = Singleton.INSTANCE.impl; + if (topMetrics == null) + throw new IllegalStateException( + "The NMCacheUploaderSCMProtocolMetrics singleton instance is not" + + "initialized. Have you called init first?"); + return topMetrics; + } + + static NMCacheUploaderSCMProtocolMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + + NMCacheUploaderSCMProtocolMetrics metrics = + new NMCacheUploaderSCMProtocolMetrics(); + ms.register("NMUploaderRequests", null, metrics); + return metrics; + } + + @Metric("Number of accepted uploads") MutableCounterLong acceptedUploads; + @Metric("Number of rejected uploads") MutableCounterLong rejectedUploads; + + /** + * One accepted upload event + */ + public void incAcceptedUploads() { + acceptedUploads.incr(); + } + + /** + * One rejected upload event + */ + public void incRejectedUploads() { + rejectedUploads.incr(); + } + + public long getAcceptedUploads() { return acceptedUploads.value(); } + public long getRejectUploads() { return rejectedUploads.value(); } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestNMCacheUploaderSCMProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestNMCacheUploaderSCMProtocolService.java new file mode 100644 index 0000000..d984b37 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestNMCacheUploaderSCMProtocolService.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMRequest; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.NMCacheUploaderSCMProtocolMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + +/** + * Basic unit tests for the NodeManger to SCM Protocol Service. + */ +public class TestNMCacheUploaderSCMProtocolService { + private static File testDir = null; + + @BeforeClass + public static void setupTestDirs() throws IOException { + testDir = new File("target", + TestNMCacheUploaderSCMProtocolService.class.getCanonicalName()); + testDir.delete(); + testDir.mkdirs(); + testDir = testDir.getAbsoluteFile(); + } + + @AfterClass + public static void cleanupTestDirs() throws IOException { + if (testDir != null) { + testDir.delete(); + } + } + + private NMCacheUploaderSCMProtocolService service; + private NMCacheUploaderSCMProtocol proxy; + private SCMStore store; + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + @Before + public void startUp() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.SCM_STORE_CLASS, + InMemorySCMStore.class.getName()); + conf.set(YarnConfiguration.SHARED_CACHE_ROOT, testDir.getPath()); + AppChecker appChecker = mock(AppChecker.class); + store = new InMemorySCMStore(appChecker); + store.init(conf); + store.start(); + + service = new NMCacheUploaderSCMProtocolService(store); + service.init(conf); + service.start(); + + YarnRPC rpc = YarnRPC.create(new Configuration()); + + InetSocketAddress scmAddress = + conf.getSocketAddr(YarnConfiguration.NM_SCM_ADDRESS, + YarnConfiguration.DEFAULT_NM_SCM_ADDRESS, + YarnConfiguration.DEFAULT_NM_SCM_PORT); + + proxy = + (NMCacheUploaderSCMProtocol) rpc.getProxy( + NMCacheUploaderSCMProtocol.class, scmAddress, conf); + } + + @After + public void cleanUp() { + if (store != null) { + store.stop(); + } + + if (service != null) { + service.stop(); + } + + if (proxy != null) { + RPC.stopProxy(proxy); + } + } + + @Test + public void testNotify_noEntry() throws Exception { + long accepted = + NMCacheUploaderSCMProtocolMetrics.getInstance().getAcceptedUploads(); + + NotifySCMRequest request = + recordFactory.newRecordInstance(NotifySCMRequest.class); + request.setResourceKey("key1"); + request.setFilename("foo.jar"); + assertTrue(proxy.notify(request).getAccepted()); + Collection set = + store.getResourceReferences("key1"); + assertNotNull(set); + assertEquals(0, set.size()); + + assertEquals( + "NM upload metrics aren't updated.", 1, + NMCacheUploaderSCMProtocolMetrics.getInstance().getAcceptedUploads() - + accepted); + + } + + @Test + public void testNotify_entryExists_differentName() throws Exception { + + long rejected = + NMCacheUploaderSCMProtocolMetrics.getInstance().getRejectUploads(); + + store.addResource("key1", "foo.jar"); + NotifySCMRequest request = + recordFactory.newRecordInstance(NotifySCMRequest.class); + request.setResourceKey("key1"); + request.setFilename("foobar.jar"); + assertFalse(proxy.notify(request).getAccepted()); + Collection set = + store.getResourceReferences("key1"); + assertNotNull(set); + assertEquals(0, set.size()); + assertEquals( + "NM upload metrics aren't updated.", 1, + NMCacheUploaderSCMProtocolMetrics.getInstance().getRejectUploads() - + rejected); + + } + + @Test + public void testNotify_entryExists_sameName() throws Exception { + + long accepted = + NMCacheUploaderSCMProtocolMetrics.getInstance().getAcceptedUploads(); + + store.addResource("key1", "foo.jar"); + NotifySCMRequest request = + recordFactory.newRecordInstance(NotifySCMRequest.class); + request.setResourceKey("key1"); + request.setFilename("foo.jar"); + assertTrue(proxy.notify(request).getAccepted()); + Collection set = + store.getResourceReferences("key1"); + assertNotNull(set); + assertEquals(0, set.size()); + assertEquals( + "NM upload metrics aren't updated.", 1, + NMCacheUploaderSCMProtocolMetrics.getInstance().getAcceptedUploads() - + accepted); + + } +}