From 76d21aa3c800e730f8b291f547458ce0584492ee Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Wed, 5 Nov 2014 16:22:24 -0800 Subject: [PATCH] TestReplicationIngest --- .../hbase/replication/TestReplicationIngest.java | 165 +++++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationIngest.java diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationIngest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationIngest.java new file mode 100644 index 0000000..eea0d9f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationIngest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.MultiThreadedAction; +import org.apache.hadoop.hbase.util.MultiThreadedReader; +import org.apache.hadoop.hbase.util.MultiThreadedUpdater; +import org.apache.hadoop.hbase.util.MultiThreadedWriter; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestReplicationIngest { + + private static final Log LOG = LogFactory.getLog(TestReplicationIngest.class); + private static final TableName TABLE = TableName.valueOf("TestReplicationIngest"); + private static final byte[] CF = Bytes.toBytes("cf"); + + private static MiniZooKeeperCluster miniZK; + private static HBaseTestingUtility[] utilities; + private static Configuration[] configurations; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean("dfs.support.append", true); + conf.setInt("hbase.regionserver.logroll.period", 10000); + conf.setLong("hbase.master.logcleaner.ttl", 10); + conf.setBoolean("hbase.regionserver.separate.hlog.for.meta", true); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); + conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + startMiniClusters(conf, 2); + createPreSplitTestTable(utilities[0]); + createPreSplitTestTable(utilities[1]); + addPeer("1", 0, 1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + shutDownMiniClusters(); + } + + @Test + public void testReplicationIngest() throws Exception { + int numKeys = 10000; + LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF); + + for (int i = 0; i < 10; i++) { + // Write on cluster 0 (master) + MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, configurations[0], TABLE); + MultiThreadedUpdater updater = new MultiThreadedUpdater(dataGen, configurations[0], TABLE, 20); + updater.linkToWriter(writer); + + // Verify on cluster 1 (slave) + MultiThreadedReader reader = new MultiThreadedReader(dataGen, configurations[1], TABLE, + 100); + reader.linkToWriter(writer); + + LOG.info("Generating key block " + (i+1)); + writer.start(i * numKeys, (i+1) * numKeys, 10); + updater.start(i * numKeys, (i+1) * numKeys, 10); + writer.waitForFinish(); + updater.waitForFinish(); + assertEquals(0, writer.getNumWriteFailures()); + assertEquals(0, updater.getNumWriteFailures()); + + Thread.sleep(10 * 1000); + + LOG.info("Verifying key block " + (i+1)); + reader.start(i * numKeys, (i+1) * numKeys, 10); + reader.waitForFinish(); + assertEquals(0, reader.getNumReadFailures()); + assertEquals(0, reader.getNumReadErrors()); + } + } + + @SuppressWarnings("resource") + private static void startMiniClusters(Configuration template, int numClusters) + throws Exception { + Random random = new Random(); + utilities = new HBaseTestingUtility[numClusters]; + configurations = new Configuration[numClusters]; + for (int i = 0; i < numClusters; i++) { + Configuration conf = new Configuration(template); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt()); + HBaseTestingUtility utility = new HBaseTestingUtility(conf); + if (i == 0) { + utility.startMiniZKCluster(); + miniZK = utility.getZkCluster(); + } else { + utility.setZkCluster(miniZK); + } + utility.startMiniCluster(); + utilities[i] = utility; + configurations[i] = conf; + new ZooKeeperWatcher(conf, "cluster" + i, null, true); + } + } + + private static void shutDownMiniClusters() throws Exception { + int numClusters = utilities.length; + for (int i = numClusters - 1; i >= 0; i--) { + if (utilities[i] != null) { + utilities[i].shutdownMiniCluster(); + } + } + miniZK.shutdown(); + } + + private static void createPreSplitTestTable(HBaseTestingUtility util) throws IOException { + HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), + new HTableDescriptor(TABLE), + new HColumnDescriptor(CF) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL), 3); + util.waitUntilAllRegionsAssigned(TABLE); + } + + private static void addPeer(String id, int masterClusterNumber, int slaveClusterNumber) + throws Exception { + ReplicationAdmin replicationAdmin = null; + try { + replicationAdmin = new ReplicationAdmin(configurations[masterClusterNumber]); + replicationAdmin.addPeer(id, utilities[slaveClusterNumber].getClusterKey()); + } finally { + replicationAdmin.close(); + } + } +} -- 1.7.12.4 (Apple Git-37)