+ * The protocol between a NodeManager's
+ * SharedCacheUploadService and the
+ * SharedCacheManager.
+ *
+ * The method used by the NodeManager's SharedCacheUploadService
+ * to notify the shared cache manager of a newly cached resource.
+ *
+ * The SharedCacheManager responds with whether or not the
+ * NodeManager should delete the uploaded file.
+ *
+ * The request from clients to the SharedCacheManager that claims a
+ * resource in the shared cache.
+ *
key of the resource that was just uploaded to the
+ * shared cache.
+ *
+ * @return key
+ */
+ @Public
+ @Stable
+ public abstract String getResourceKey();
+
+ /**
+ * Set the key of the resource that was just uploaded to the
+ * shared cache.
+ *
+ * @param key unique identifier for the resource
+ */
+ @Public
+ @Stable
+ public abstract void setResourceKey(String key);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NotifySCMResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NotifySCMResponse.java
new file mode 100644
index 0000000..c17b430
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NotifySCMResponse.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+
+/**
+ * + * The response from the SharedCacheManager to the NodeManager that indicates + * whether the NodeManager needs to delete the cached resource it was sending + * the notification for. + *
+ */ +@Public +@Stable +public abstract class NotifySCMResponse { + + /** + * Get whether or not the shared cache manager has accepted the notified + * resource (i.e. the uploaded file should remain in the cache). + * + * @return boolean True if the resource has been accepted, false otherwise. + */ + @Public + @Stable + public abstract boolean getAccepted(); + + /** + * Set whether or not the shared cache manager has accepted the notified + * resource (i.e. the uploaded file should remain in the cache). + * + * @param b True if the resource has been accepted, false otherwise. + */ + @Public + @Stable + public abstract void setAccepted(boolean b); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NotifySCMRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NotifySCMRequestPBImpl.java new file mode 100644 index 0000000..6bddb96 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NotifySCMRequestPBImpl.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NotifySCMRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NotifySCMRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMRequest; + +public class NotifySCMRequestPBImpl extends + NotifySCMRequest { + NotifySCMRequestProto proto = NotifySCMRequestProto + .getDefaultInstance(); + NotifySCMRequestProto.Builder builder = null; + boolean viaProto = false; + + public NotifySCMRequestPBImpl() { + builder = NotifySCMRequestProto.newBuilder(); + } + + public NotifySCMRequestPBImpl(NotifySCMRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public NotifySCMRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public String getResourceKey() { + NotifySCMRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasResourceKey()) ? p.getResourceKey() : null; + } + + @Override + public void setResourceKey(String key) { + maybeInitBuilder(); + if (key == null) { + builder.clearResourceKey(); + return; + } + builder.setResourceKey(key); + } + + @Override + public String getFileName() { + NotifySCMRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasFilename()) ? p.getFilename() : null; + } + + @Override + public void setFilename(String filename) { + maybeInitBuilder(); + if (filename == null) { + builder.clearFilename(); + return; + } + builder.setFilename(filename); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = NotifySCMRequestProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NotifySCMResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NotifySCMResponsePBImpl.java new file mode 100644 index 0000000..cb5f978 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NotifySCMResponsePBImpl.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NotifySCMResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NotifySCMResponseProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMResponse; + +public class NotifySCMResponsePBImpl extends + NotifySCMResponse { + NotifySCMResponseProto proto = NotifySCMResponseProto + .getDefaultInstance(); + NotifySCMResponseProto.Builder builder = null; + boolean viaProto = false; + + public NotifySCMResponsePBImpl() { + builder = NotifySCMResponseProto.newBuilder(); + } + + public NotifySCMResponsePBImpl( +NotifySCMResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public NotifySCMResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public boolean getAccepted() { + NotifySCMResponseProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasAccepted()) ? p.getAccepted() : null; + } + + @Override + public void setAccepted(boolean b) { + maybeInitBuilder(); + builder.setAccepted(b); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = NotifySCMResponseProto.newBuilder(proto); + } + viaProto = false; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/NMCacheUploader_SCM_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/NMCacheUploader_SCM_protocol.proto new file mode 100644 index 0000000..9e4c33d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/NMCacheUploader_SCM_protocol.proto @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "NMCacheUploaderSCMProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_server_common_service_protos.proto"; + +service NMCacheUploaderSCMProtocolService { + rpc notify(NotifySCMRequestProto) returns (NotifySCMResponseProto); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index aab4383..d4585c9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -68,3 +68,12 @@ message NMContainerStatusProto { optional string diagnostics = 5 [default = "N/A"]; optional int32 container_exit_status = 6; } + +message NotifySCMRequestProto { + optional string resource_key = 1; + optional string filename = 2; +} + +message NotifySCMResponseProto { + optional bool accepted = 1; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/NMCacheUploaderSCMProtocolService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/NMCacheUploaderSCMProtocolService.java new file mode 100644 index 0000000..472aaa8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/NMCacheUploaderSCMProtocolService.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMResponse; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.NMCacheUploaderSCMProtocolMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; + +/** + * This service handles all rpc calls from the NodeManager uploader to the + * shared cache manager. + */ +public class NMCacheUploaderSCMProtocolService extends AbstractService implements + NMCacheUploaderSCMProtocol { + + private static final Log LOG = LogFactory.getLog(NMCacheUploaderSCMProtocolService.class); + + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private Server server; + InetSocketAddress bindAddress; + private final SCMStore store; + private NMCacheUploaderSCMProtocolMetrics metrics; + + public NMCacheUploaderSCMProtocolService(SCMStore store) { + super(NMCacheUploaderSCMProtocolService.class.getName()); + this.store = store; + } + + @Override + protected synchronized void serviceInit(Configuration conf) throws Exception { + this.bindAddress = getBindAddress(conf); + + super.serviceInit(conf); + } + + InetSocketAddress getBindAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.NM_SCM_ADDRESS, + YarnConfiguration.DEFAULT_NM_SCM_ADDRESS, + YarnConfiguration.DEFAULT_NM_SCM_PORT); + } + + @Override + protected synchronized void serviceStart() throws Exception { + Configuration conf = getConfig(); + this.metrics = NMCacheUploaderSCMProtocolMetrics.initSingleton(conf); + + YarnRPC rpc = YarnRPC.create(conf); + this.server = + rpc.getServer(NMCacheUploaderSCMProtocol.class, this, bindAddress, + conf, null, // Secret manager null for now (security not supported) + conf.getInt(YarnConfiguration.SCM_NM_THREAD_COUNT, + YarnConfiguration.DEFAULT_SCM_NM_THREAD_COUNT)); + + // TODO: Enable service authorization + + this.server.start(); + bindAddress = + conf.updateConnectAddr(YarnConfiguration.NM_SCM_ADDRESS, + server.getListenerAddress()); + + super.serviceStart(); + } + + @Override + protected synchronized void serviceStop() throws Exception { + if (this.server != null) { + this.server.stop(); + } + + super.serviceStop(); + } + + @Override + public NotifySCMResponse notify(NotifySCMRequest request) + throws YarnException, IOException { + NotifySCMResponse response = recordFactory.newRecordInstance(NotifySCMResponse.class); + + // TODO: Security/authorization + + String filename = store.addKey(request.getResourceKey(), request.getFileName()); + + boolean accepted = filename.equals(request.getFileName()); + + if (accepted) { + this.metrics.incAcceptedUploads(); + } else { + this.metrics.incRejectedUploads(); + } + + response.setAccepted(accepted); + + return response; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java index 4c733aa..6ebb3c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -80,6 +80,10 @@ protected synchronized void serviceInit(Configuration conf) throws Exception { CleanerService cs = createCleanerService(appChecker, store, context); addService(cs); + NMCacheUploaderSCMProtocolService nms = + createNMCacheUploaderSCMProtocolService(store); + addService(nms); + } catch (IOException e) { LOG.error("Encountered unexpected exception while initializing the shared cache manager", e); @@ -192,6 +196,11 @@ private CleanerService createCleanerService(AppChecker appChecker, return new CleanerService(appChecker, store, context); } + private NMCacheUploaderSCMProtocolService createNMCacheUploaderSCMProtocolService( + SCMStore store) { + return new NMCacheUploaderSCMProtocolService(store); + } + @Override protected synchronized void serviceStart() throws Exception { // Start metrics diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/NMCacheUploaderSCMProtocolMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/NMCacheUploaderSCMProtocolMetrics.java new file mode 100644 index 0000000..14c7354 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/NMCacheUploaderSCMProtocolMetrics.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +/** + * This class is for maintaining NM uploader requests metrics + * and publishing them through the metrics interfaces. + */ +@InterfaceAudience.Private +@Metrics(about="NM cache upload metrics", context="yarn") +public class NMCacheUploaderSCMProtocolMetrics { + + static final Log LOG = LogFactory.getLog(NMCacheUploaderSCMProtocolMetrics.class); + final MetricsRegistry registry; + + NMCacheUploaderSCMProtocolMetrics() { + registry = new MetricsRegistry("NMUploadRequests"); + LOG.debug("Initialized "+ registry); + } + + enum Singleton { + INSTANCE; + + NMCacheUploaderSCMProtocolMetrics impl; + + synchronized NMCacheUploaderSCMProtocolMetrics init(Configuration conf) { + if (impl == null) { + impl = create(); + } + return impl; + } + } + + public static NMCacheUploaderSCMProtocolMetrics initSingleton(Configuration conf) { + return Singleton.INSTANCE.init(conf); + } + + public static NMCacheUploaderSCMProtocolMetrics getInstance() { + NMCacheUploaderSCMProtocolMetrics topMetrics = Singleton.INSTANCE.impl; + if (topMetrics == null) + throw new IllegalStateException( + "The NMCacheUploaderSCMProtocolMetrics singleton instance is not initialized." + + " Have you called init first?"); + return topMetrics; + } + + static NMCacheUploaderSCMProtocolMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + + NMCacheUploaderSCMProtocolMetrics metrics = new NMCacheUploaderSCMProtocolMetrics(); + ms.register("NMUploaderRequests", null, metrics); + return metrics; + } + + @Metric("Number of accepted uploads") MutableCounterLong acceptedUploads; + @Metric("Number of rejected uploads") MutableCounterLong rejectedUploads; + + /** + * One accepted upload event + */ + public void incAcceptedUploads() { + acceptedUploads.incr(); + } + + /** + * One rejected upload event + */ + public void incRejectedUploads() { + rejectedUploads.incr(); + } + + public long getAcceptedUploads() { return acceptedUploads.value(); } + public long getRejectUploads() { return rejectedUploads.value(); } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestNMCacheUploaderSCMProtocolService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestNMCacheUploaderSCMProtocolService.java new file mode 100644 index 0000000..a649a10 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestNMCacheUploaderSCMProtocolService.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.net.InetSocketAddress; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.NMCacheUploaderSCMProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.NotifySCMRequest; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.NMCacheUploaderSCMProtocolMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.ResourceReference; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + + +/** + * Basic unit tests for the NodeManger to SCM Protocol Service. + */ +public class TestNMCacheUploaderSCMProtocolService { + + static NMCacheUploaderSCMProtocolService service; + static NMCacheUploaderSCMProtocol proxy; + static SCMStore store; + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + @BeforeClass + public static void startUp() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.SCM_STORE_IMPL, InMemorySCMStore.class.getName()); + store = new InMemorySCMStore(new SCMContext()); + store.init(conf); + store.start(); + + service = new NMCacheUploaderSCMProtocolService(store); + service.init(conf); + service.start(); + + YarnRPC rpc = YarnRPC.create(new Configuration()); + + InetSocketAddress scmAddress = + conf.getSocketAddr(YarnConfiguration.NM_SCM_ADDRESS, + YarnConfiguration.DEFAULT_NM_SCM_ADDRESS, + YarnConfiguration.DEFAULT_NM_SCM_PORT); + + proxy = + (NMCacheUploaderSCMProtocol) rpc.getProxy( + NMCacheUploaderSCMProtocol.class, scmAddress, conf); + } + + @AfterClass + public static void cleanUp() { + if (store != null) { + store.stop(); + } + + if (service != null) { + service.stop(); + } + + if (proxy != null) { + RPC.stopProxy(proxy); + } + } + + @After + public void cleanUpTest() { + store.clearCache(); + } + + @Test + public void testNotify_noEntry() throws Exception { + long accepted = + NMCacheUploaderSCMProtocolMetrics.getInstance().getAcceptedUploads(); + + NotifySCMRequest request = + recordFactory.newRecordInstance(NotifySCMRequest.class); + request.setResourceKey("key1"); + request.setFilename("foo.jar"); + assertTrue(proxy.notify(request).getAccepted()); + Collection