diff --git hadoop-project/pom.xml hadoop-project/pom.xml index d8bfaa2..7980738 100644 --- hadoop-project/pom.xml +++ hadoop-project/pom.xml @@ -698,7 +698,6 @@ zookeeper 3.4.5 test-jar - test org.jboss.netty diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index 91dc26c..68f1153 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -182,7 +182,6 @@ org.apache.zookeeper zookeeper test-jar - test @@ -228,6 +227,13 @@ test-compile + + + + org.apache.hadoop.test.YarnTestDriver + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index affc6f9..509814a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -369,6 +370,12 @@ public String getDiagnostics() { public long getFinishTime() { return finishTime; } + + @VisibleForTesting + public void setAppAttemptIds( + Map attemptIds) { + attempts = attemptIds; + } } public static class RMDTSecretManagerState { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java new file mode 100644 index 0000000..3c2736b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.test; + +import org.apache.hadoop.util.ProgramDriver; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestZKRMStateStorePerf; + +/** + * Driver for Yarn tests. + * + */ +public class YarnTestDriver { + + private ProgramDriver pgd; + + public YarnTestDriver() { + this(new ProgramDriver()); + } + + public YarnTestDriver(ProgramDriver pgd) { + this.pgd = pgd; + try { + pgd.addClass(TestZKRMStateStorePerf.class.getSimpleName(), + TestZKRMStateStorePerf.class, + "ZKRMStateStore i/o benchmark."); + } catch(Throwable e) { + e.printStackTrace(); + } + } + + public void run(String argv[]) { + int exitCode = -1; + try { + exitCode = pgd.run(argv); + } catch(Throwable e) { + e.printStackTrace(); + } + System.exit(exitCode); + } + + public static void main(String argv[]){ + new YarnTestDriver().run(argv); + } +} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 507e164..5004115 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import org.apache.hadoop.yarn.event.Event; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -74,7 +75,7 @@ public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class); static class TestDispatcher implements - Dispatcher, EventHandler { + Dispatcher, EventHandler { ApplicationAttemptId attemptId; Exception storedException; @@ -88,9 +89,12 @@ public void register(Class eventType, } @Override - public void handle(RMAppAttemptNewSavedEvent event) { - assertEquals(attemptId, event.getApplicationAttemptId()); - assertEquals(storedException, event.getStoredException()); + public void handle(Event event) { + if (event instanceof RMAppAttemptNewSavedEvent){ + RMAppAttemptNewSavedEvent rmEvent = (RMAppAttemptNewSavedEvent) event; + assertEquals(attemptId, rmEvent.getApplicationAttemptId()); + assertEquals(storedException, rmEvent.getStoredException()); + } notified = true; synchronized (this) { notifyAll(); @@ -130,7 +134,8 @@ void waitNotify(TestDispatcher dispatcher) { dispatcher.notified = false; } - RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime, + protected RMApp storeApp(RMStateStore store, ApplicationId appId, + long submitTime, long startTime) throws Exception { ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); @@ -146,7 +151,8 @@ RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime, return mockApp; } - ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, + protected ContainerId storeAttempt(RMStateStore store, + ApplicationAttemptId attemptId, String containerIdStr, Token appToken, SecretKey clientTokenMasterKey, TestDispatcher dispatcher) throws Exception { @@ -450,7 +456,7 @@ public void testRMDTSecretManagerStateStore( } - private Token generateAMRMToken( + protected Token generateAMRMToken( ApplicationAttemptId attemptId, AMRMTokenSecretManager appTokenMgr) { AMRMTokenIdentifier appTokenId = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java new file mode 100644 index 0000000..2d856e2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import com.google.common.base.Optional; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import javax.crypto.SecretKey; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Before; +import org.junit.After; +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestZKRMStateStorePerf extends RMStateStoreTestBase + implements Tool { + public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class); + + final String version = "0.1"; + + // Configurable variables for performance test + private int ZK_PERF_NUM_APP_DEFAULT = 100; + private int ZK_PERF_NUM_APPATTEMPT_PER_APP = 100; + + private static final int ZK_TIMEOUT_MS = 100000; + private final long clusterTimeStamp = 1352994193343L; + + private static final String USAGE = + "Usage: " + TestZKRMStateStorePerf.class.getSimpleName() + + " -appSize numberOfApplications" + + " -appAttemptSize numberOfApplicationAttempts" + + " [-hostPort Host:Port]" + + " [-workingZnode rootZnodeForTesting]\n"; + + private YarnConfiguration conf = null; + private String workingZnode = "/Test"; + private ZKRMStateStore store; + private AMRMTokenSecretManager appTokenMgr; + private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr; + + @Before + public void setUpZKServer() throws Exception { + super.setUp(); + } + + @After + public void tearDown() throws Exception { + if (store != null) { + store.stop(); + } + if (appTokenMgr != null) { + appTokenMgr.stop(); + } + super.tearDown(); + } + + private void initStore(String hostPort) { + Optional optHostPort = Optional.fromNullable(hostPort); + + conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort)); + conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); + + store = new ZKRMStateStore(); + store.init(conf); + store.start(); + appTokenMgr = new AMRMTokenSecretManager(conf); + clientToAMTokenMgr = new ClientToAMTokenSecretManagerInRM(); + } + + @Override + public int run(String[] args) { + LOG.info("Starting ZKRMStateStorePerf ver." + version); + + int numApp = ZK_PERF_NUM_APP_DEFAULT; + int numAppAttemptPerApp = ZK_PERF_NUM_APPATTEMPT_PER_APP; + String hostPort = null; + boolean launchLocalZK= true; + + if (args.length == 0) { + System.err.println("Missing arguments."); + return -1; + } + + for (int i = 0; i < args.length; i++) { // parse command line + if (args[i].equalsIgnoreCase("-appsize")) { + numApp = Integer.parseInt(args[++i]); + } else if (args[i].equalsIgnoreCase("-appattemptsize")) { + numAppAttemptPerApp = Integer.parseInt(args[++i]); + } else if (args[i].equalsIgnoreCase("-hostPort")) { + hostPort = args[++i]; + launchLocalZK = false; + } else if (args[i].equalsIgnoreCase("-workingZnode")) { + workingZnode = args[++i]; + } else { + System.err.println("Illegal argument: " + args[i]); + return -1; + } + } + + if (launchLocalZK) { + try { + setUp(); + } catch (Exception e) { + System.err.println("failed to setup. : " + e.getMessage()); + return -1; + } + } + + initStore(hostPort); + + long submitTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis() + 1234; + + ArrayList applicationIds = new ArrayList(); + ArrayList rmApps = new ArrayList(); + ArrayList attemptIds = + new ArrayList(); + HashMap> appIdsToAttemptId = + new HashMap>(); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + + for (int i = 0; i < numApp; i++) { + ApplicationId appId = ApplicationId.newInstance(clusterTimeStamp, i); + applicationIds.add(appId); + for (int j = 0; j < numAppAttemptPerApp; j++) { + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, j); + attemptIds.add(attemptId); + } + appIdsToAttemptId.put(appId, new LinkedHashSet(attemptIds)); + } + + for (ApplicationId appId : applicationIds) { + RMApp app = null; + try { + app = storeApp(store, appId, submitTime, startTime); + } catch (Exception e) { + System.err.println("failed to create Application Znode. : " + + e.getMessage()); + return -1; + } + waitNotify(dispatcher); + rmApps.add(app); + } + + for (ApplicationAttemptId attemptId : attemptIds) { + Token tokenId = + generateAMRMToken(attemptId, appTokenMgr); + SecretKey clientTokenKey = + clientToAMTokenMgr.createMasterKey(attemptId); + try { + storeAttempt(store, attemptId, + ContainerId.newInstance(attemptId, 0).toString(), + tokenId, clientTokenKey, dispatcher); + } catch (Exception e) { + System.err.println("failed to create AppAttempt Znode. : " + + e.getMessage()); + return -1; + } + } + + long storeStart = System.currentTimeMillis(); + try { + store.loadState(); + } catch (Exception e) { + System.err.println("failed to locaState from ZKRMStateStore. : " + + e.getMessage()); + return -1; + } + long storeEnd = System.currentTimeMillis(); + + long loadTime = storeEnd - storeStart; + + String resultMsg = "ZKRMStateStore takes " + loadTime + " msec to loadState."; + LOG.info(resultMsg); + System.out.println(resultMsg); + + // cleanup + try { + for (RMApp app : rmApps) { + RMStateStore.ApplicationState appState = + new RMStateStore.ApplicationState(app.getSubmitTime(), + app.getStartTime(), app.getApplicationSubmissionContext(), + app.getUser()); + ApplicationId appId = app.getApplicationId(); + Map m = mock(Map.class); + when(m.keySet()).thenReturn(appIdsToAttemptId.get(appId)); + appState.setAppAttemptIds(m); + store.removeApplicationStateInternal(appState); + } + } catch (Exception e) { + System.err.println("failed to cleanup. : " + e.getMessage()); + return -1; + } + + return 0; + } + + @Override + public void setConf(Configuration conf) { + // currently this function is just ignored + } + + @Override + public Configuration getConf() { + return conf; + } + + @Test + public void perfZKRMStateStore() throws Exception { + String[] args = { + "-appSize", String.valueOf(ZK_PERF_NUM_APP_DEFAULT), + "-appAttemptSize", String.valueOf(ZK_PERF_NUM_APPATTEMPT_PER_APP) + }; + run(args); + } + + static public void main(String[] args) throws Exception { + TestZKRMStateStorePerf perf = new TestZKRMStateStorePerf(); + + int res = -1; + try { + res = ToolRunner.run(perf, args); + } catch(Exception e) { + System.err.print(StringUtils.stringifyException(e)); + res = -2; + } + if(res == -1) { + System.err.print(USAGE); + } + System.exit(res); + } +}