diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index b5b7a43..3ddce5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -49,6 +49,9 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.server.services.MicroZookeeperService; +import org.apache.hadoop.registry.server.services.MicroZookeeperServiceKeys; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -106,7 +109,7 @@ private String[] rmIds; private ApplicationHistoryServer appHistoryServer; - + private MicroZookeeperService zookeeper; private boolean useFixedPorts; private boolean useRpc = false; private int failoverTimeout; @@ -121,6 +124,7 @@ // Number of nm-log-dirs per nodemanager private int numLogDirs; private boolean enableAHS; + private final boolean enableRegistry; /** * @param testName name of the test @@ -129,14 +133,17 @@ * @param numLocalDirs the number of nm-local-dirs per nodemanager * @param numLogDirs the number of nm-log-dirs per nodemanager * @param enableAHS enable ApplicationHistoryServer or not + * @param enableRegistry enable the registry or not */ public MiniYARNCluster( String testName, int numResourceManagers, int numNodeManagers, - int numLocalDirs, int numLogDirs, boolean enableAHS) { + int numLocalDirs, int numLogDirs, boolean enableAHS, + boolean enableRegistry) { super(testName.replace("$", "")); this.numLocalDirs = numLocalDirs; this.numLogDirs = numLogDirs; this.enableAHS = enableAHS; + this.enableRegistry = enableRegistry; String testSubDir = testName.replace("$", ""); File targetWorkDir = new File("target", testSubDir); try { @@ -192,6 +199,21 @@ public MiniYARNCluster( * @param numNodeManagers the number of node managers in the cluster * @param numLocalDirs the number of nm-local-dirs per nodemanager * @param numLogDirs the number of nm-log-dirs per nodemanager + * @param enableAHS enable ApplicationHistoryServer or not + */ + public MiniYARNCluster( + String testName, int numResourceManagers, int numNodeManagers, + int numLocalDirs, int numLogDirs, boolean enableAHS) { + this(testName, numResourceManagers, numNodeManagers, numLocalDirs, + numLogDirs, enableAHS, false); + } + + /** + * @param testName name of the test + * @param numResourceManagers the number of resource managers in the cluster + * @param numNodeManagers the number of node managers in the cluster + * @param numLocalDirs the number of nm-local-dirs per nodemanager + * @param numLogDirs the number of nm-log-dirs per nodemanager */ public MiniYARNCluster( String testName, int numResourceManagers, int numNodeManagers, @@ -243,6 +265,15 @@ public void serviceInit(Configuration conf) throws Exception { rmIds = rmIdsCollection.toArray(new String[rmIdsCollection.size()]); } + // enable the in-memory ZK cluster AHEAD of RMs to ensure it starts first + if (enableRegistry) { + zookeeper = new MicroZookeeperService("Local ZK service"); + addService(zookeeper); + conf.setBoolean(RegistryConstants.KEY_REGISTRY_ENABLED, true); + conf.set(MicroZookeeperServiceKeys.KEY_ZKSERVICE_DIR, + new File(testWorkDir, "zookeeper").getAbsolutePath()); + } + for (int i = 0; i < resourceManagers.length; i++) { resourceManagers[i] = createResourceManager(); if (!useFixedPorts) { @@ -742,4 +773,8 @@ protected void doSecureLogin() throws IOException { public int getNumOfResourceManager() { return this.resourceManagers.length; } + + public MicroZookeeperService getZookeeper() { + return zookeeper; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterRegistry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterRegistry.java new file mode 100644 index 0000000..5ebcf25 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterRegistry.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService; +import org.apache.hadoop.registry.server.services.MicroZookeeperService; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.registry.RMRegistryService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * Test registry support in the cluster + */ +public class TestMiniYARNClusterRegistry extends Assert { + + + MiniYARNCluster cluster; + + @Rule + public final Timeout testTimeout = new Timeout(10000); + + @Rule + public TestName methodName = new TestName(); + private Configuration conf; + + @Before + public void setup() throws IOException, InterruptedException { + conf = new YarnConfiguration(); + + cluster = new MiniYARNCluster(methodName.getMethodName(), + 1, 1, 1, 1, false, true); + cluster.init(conf); + cluster.start(); + } + + @Test + public void testZKInstance() throws Exception { + assertNotNull("zookeeper", cluster.getZookeeper()); + } + + @Test + public void testZKConnectionAddress() throws Exception { + MicroZookeeperService zookeeper = cluster.getZookeeper(); + InetSocketAddress address = zookeeper.getConnectionAddress(); + assertTrue("Unconfigured address", address.getPort() != 0); + } + + @Test + public void testZKConfigPatchPropagaton() throws Exception { + MicroZookeeperService zookeeper = cluster.getZookeeper(); + String connectionString = zookeeper.getConnectionString(); + String confConnection = conf.get(RegistryConstants.KEY_REGISTRY_ZK_QUORUM); + assertNotNull(confConnection); + assertEquals(connectionString, confConnection); + } + + @Test + public void testRegistryCreated() throws Exception { + assertTrue("registry not enabled", + cluster.getConfig().getBoolean(RegistryConstants.KEY_REGISTRY_ENABLED, + false)); + MicroZookeeperService zookeeper = cluster.getZookeeper(); + String connectionString = zookeeper.getConnectionString(); + String confConnection = conf.get(RegistryConstants.KEY_REGISTRY_ZK_QUORUM); + ResourceManager rm = cluster.getResourceManager(0); + RMRegistryService registry = rm.getRMContext().getRegistry(); + assertNotNull("null registry", registry); + } + + @Test + public void testPathsExist() throws Throwable { + MicroZookeeperService zookeeper = cluster.getZookeeper(); + // service to directly hook in to the ZK server + RegistryOperationsService operations = + new RegistryOperationsService("operations", zookeeper); + operations.init(new YarnConfiguration()); + operations.start(); + + operations.stat("/"); + //verifies that the RM startup has created the system services path + operations.stat(RegistryConstants.PATH_SYSTEM_SERVICES); + + } +}