From 20586faa5741d56f3ca6e20d597548ae4b22da8a Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 9 Apr 2018 22:24:46 +0800 Subject: [PATCH] HBASE-19079 Support setting up two clusters with A and S state --- .../SyncReplicationPeerInfoProviderImpl.java | 6 +- .../hbase/replication/SyncReplicationTestBase.java | 152 +++++++++++++++ .../hbase/replication/TestSyncReplication.java | 207 --------------------- .../replication/TestSyncReplicationActive.java | 90 +++++++++ .../replication/TestSyncReplicationStandBy.java | 95 ++++++++++ 5 files changed, 341 insertions(+), 209 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java index e4afc33..cb33dab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java @@ -54,8 +54,10 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv } Pair states = peer.getSyncReplicationStateAndNewState(); - if (states.getFirst() == SyncReplicationState.ACTIVE && - states.getSecond() == SyncReplicationState.NONE) { + if ((states.getFirst() == SyncReplicationState.ACTIVE && + states.getSecond() == SyncReplicationState.NONE) || + (states.getFirst() == SyncReplicationState.DOWNGRADE_ACTIVE && + states.getSecond() == SyncReplicationState.ACTIVE)) { return Optional.of(Pair.newPair(peerId, peer.getPeerConfig().getRemoteWALDir())); } else { return Optional.empty(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java new file mode 100644 index 0000000..c4f8b18 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +/** + * Base class for testing sync replication. + */ +public class SyncReplicationTestBase { + + protected static final HBaseZKTestingUtility ZK_UTIL = new HBaseZKTestingUtility(); + + protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); + + protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); + + protected static TableName TABLE_NAME = TableName.valueOf("SyncRep"); + + protected static byte[] CF = Bytes.toBytes("cf"); + + protected static byte[] CQ = Bytes.toBytes("cq"); + + protected static String PEER_ID = "1"; + + private static void initTestingUtility(HBaseTestingUtility util, String zkParent) { + util.setZkCluster(ZK_UTIL.getZkCluster()); + Configuration conf = util.getConfiguration(); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent); + conf.setInt("replication.source.size.capacity", 102400); + conf.setLong("replication.source.sleepforretries", 100); + conf.setInt("hbase.regionserver.maxlogs", 10); + conf.setLong("hbase.master.logcleaner.ttl", 10); + conf.setInt("zookeeper.recovery.retry", 1); + conf.setInt("zookeeper.recovery.retry.intervalmill", 10); + conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf.setInt("replication.stats.thread.period.seconds", 5); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf.setLong("replication.sleep.before.failover", 2000); + conf.setInt("replication.source.maxretriesmultiplier", 10); + conf.setFloat("replication.source.ratio", 1.0f); + conf.setBoolean("replication.source.eof.autorecovery", true); + } + + @BeforeClass + public static void setUp() throws Exception { + ZK_UTIL.startMiniZKCluster(); + initTestingUtility(UTIL1, "/cluster1"); + initTestingUtility(UTIL2, "/cluster2"); + UTIL1.startMiniCluster(3); + UTIL2.startMiniCluster(3); + TableDescriptor td = + TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); + UTIL1.getAdmin().createTable(td); + UTIL2.getAdmin().createTable(td); + FileSystem fs1 = UTIL1.getTestFileSystem(); + FileSystem fs2 = UTIL2.getTestFileSystem(); + Path remoteWALDir1 = + new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(), + "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory()); + Path remoteWALDir2 = + new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(), + "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory()); + UTIL1.getAdmin().addReplicationPeer(PEER_ID, + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) + .setReplicateAllUserTables(false) + .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) + .setRemoteWALDir(remoteWALDir2.toUri().toString()).build()); + UTIL2.getAdmin().addReplicationPeer(PEER_ID, + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey()) + .setReplicateAllUserTables(false) + .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) + .setRemoteWALDir(remoteWALDir1.toUri().toString()).build()); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL1.shutdownMiniCluster(); + UTIL2.shutdownMiniCluster(); + ZK_UTIL.shutdownMiniZKCluster(); + } + + protected static void writeAndVerifyReplication(int start, int end) throws Exception { + try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) { + for (int i = start; i < end; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + // We should still allow replication writes + // The reject check is in RSRpcService so we can still read through HRegion + HRegion region = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); + UTIL2.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return !region.get(new Get(Bytes.toBytes(end - 1))).isEmpty(); + } + + @Override + public String explainFailure() throws Exception { + return "Replication has not been catched up yet"; + } + }); + for (int i = start; i < end; i++) { + assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); + } + } + + protected static Path getRemoteWALDir(MasterFileSystem mfs, String peerId) { + Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); + return new Path(remoteWALDir, PEER_ID); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java deleted file mode 100644 index 288dcbf..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import static org.hamcrest.CoreMatchers.containsString; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HBaseZKTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RetriesExhaustedException; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.TableDescriptorBuilder; -import org.apache.hadoop.hbase.master.MasterFileSystem; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; - -@Category({ ReplicationTests.class, LargeTests.class }) -public class TestSyncReplication { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestSyncReplication.class); - - private static final HBaseZKTestingUtility ZK_UTIL = new HBaseZKTestingUtility(); - - private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); - - private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); - - private static TableName TABLE_NAME = TableName.valueOf("SyncRep"); - - private static byte[] CF = Bytes.toBytes("cf"); - - private static byte[] CQ = Bytes.toBytes("cq"); - - private static String PEER_ID = "1"; - - private static void initTestingUtility(HBaseTestingUtility util, String zkParent) { - util.setZkCluster(ZK_UTIL.getZkCluster()); - Configuration conf = util.getConfiguration(); - conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent); - conf.setInt("replication.source.size.capacity", 102400); - conf.setLong("replication.source.sleepforretries", 100); - conf.setInt("hbase.regionserver.maxlogs", 10); - conf.setLong("hbase.master.logcleaner.ttl", 10); - conf.setInt("zookeeper.recovery.retry", 1); - conf.setInt("zookeeper.recovery.retry.intervalmill", 10); - conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); - conf.setInt("replication.stats.thread.period.seconds", 5); - conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); - conf.setLong("replication.sleep.before.failover", 2000); - conf.setInt("replication.source.maxretriesmultiplier", 10); - conf.setFloat("replication.source.ratio", 1.0f); - conf.setBoolean("replication.source.eof.autorecovery", true); - } - - @BeforeClass - public static void setUp() throws Exception { - ZK_UTIL.startMiniZKCluster(); - initTestingUtility(UTIL1, "/cluster1"); - initTestingUtility(UTIL2, "/cluster2"); - UTIL1.startMiniCluster(3); - UTIL2.startMiniCluster(3); - TableDescriptor td = - TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder - .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); - UTIL1.getAdmin().createTable(td); - UTIL2.getAdmin().createTable(td); - FileSystem fs1 = UTIL1.getTestFileSystem(); - FileSystem fs2 = UTIL2.getTestFileSystem(); - Path remoteWALDir1 = - new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(), - "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory()); - Path remoteWALDir2 = - new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(), - "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory()); - UTIL1.getAdmin().addReplicationPeer(PEER_ID, - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) - .setReplicateAllUserTables(false) - .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) - .setRemoteWALDir(remoteWALDir2.toUri().toString()).build()); - UTIL2.getAdmin().addReplicationPeer(PEER_ID, - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey()) - .setReplicateAllUserTables(false) - .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) - .setRemoteWALDir(remoteWALDir1.toUri().toString()).build()); - } - - @AfterClass - public static void tearDown() throws Exception { - UTIL1.shutdownMiniCluster(); - UTIL2.shutdownMiniCluster(); - ZK_UTIL.shutdownMiniZKCluster(); - } - - @FunctionalInterface - private interface TableAction { - - void call(Table table) throws IOException; - } - - private void assertDisallow(Table table, TableAction action) throws IOException { - try { - action.call(table); - } catch (DoNotRetryIOException | RetriesExhaustedException e) { - // expected - assertThat(e.getMessage(), containsString("STANDBY")); - } - } - - @Test - public void testStandby() throws Exception { - MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); - Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); - Path remoteWALDirForPeer = new Path(remoteWALDir, PEER_ID); - assertFalse(mfs.getWALFileSystem().exists(remoteWALDirForPeer)); - UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, - SyncReplicationState.STANDBY); - assertTrue(mfs.getWALFileSystem().exists(remoteWALDirForPeer)); - try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { - assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row")))); - assertDisallow(table, - t -> t.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))); - assertDisallow(table, t -> t.delete(new Delete(Bytes.toBytes("row")))); - assertDisallow(table, t -> t.incrementColumnValue(Bytes.toBytes("row"), CF, CQ, 1)); - assertDisallow(table, - t -> t.append(new Append(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))); - assertDisallow(table, - t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1"))))); - assertDisallow(table, - t -> t - .put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")), - new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1"))))); - assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row")) - .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))))); - } - // But we should still allow replication writes - try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) { - for (int i = 0; i < 100; i++) { - table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); - } - } - // The reject check is in RSRpcService so we can still read through HRegion - HRegion region = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); - UTIL2.waitFor(30000, new ExplainingPredicate() { - - @Override - public boolean evaluate() throws Exception { - return !region.get(new Get(Bytes.toBytes(99))).isEmpty(); - } - - @Override - public String explainFailure() throws Exception { - return "Replication has not been catched up yet"; - } - }); - for (int i = 0; i < 100; i++) { - assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java new file mode 100644 index 0000000..86f258e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.stream.Stream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationActive extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationActive.class); + + @Test + public void testActive() throws Exception { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + writeAndVerifyReplication(0, 100); + String walPrefix = URLEncoder.encode( + UTIL1.getRSForFirstRegionInTable(TABLE_NAME).getServerName().getServerName(), + StandardCharsets.UTF_8.name()); + RegionInfo region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0).getRegionInfo(); + MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); + Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID); + Path remoteWAL = Stream.of(mfs.getFileSystem().listStatus(remoteWALDir)) + .filter(f -> f.getPath().getName().contains(walPrefix)).findFirst().get().getPath(); + try (WAL.Reader reader = + WALFactory.createReader(UTIL2.getTestFileSystem(), remoteWAL, UTIL2.getConfiguration())) { + for (int i = 0; i < 100;) { + WAL.Entry entry = reader.next(); + assertNotNull(entry); + assertArrayEquals(region.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName()); + assertEquals(TABLE_NAME, entry.getKey().getTableName()); + assertEquals(1, entry.getEdit().size()); + Cell cell = entry.getEdit().getCells().get(0); + if (WALEdit.isMetaEditFamily(cell)) { + // skip region events + continue; + } + assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneRow(cell)); + assertArrayEquals(CF, CellUtil.cloneFamily(cell)); + assertArrayEquals(CQ, CellUtil.cloneQualifier(cell)); + assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(cell)); + i++; + } + assertNull(reader.next()); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java new file mode 100644 index 0000000..d11406c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationStandBy extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationStandBy.class); + + @FunctionalInterface + private interface TableAction { + + void call(Table table) throws IOException; + } + + private void assertDisallow(Table table, TableAction action) throws IOException { + try { + action.call(table); + } catch (DoNotRetryIOException | RetriesExhaustedException e) { + // expected + assertThat(e.getMessage(), containsString("STANDBY")); + } + } + + @Test + public void testStandby() throws Exception { + MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); + Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID); + assertFalse(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + assertTrue(mfs.getWALFileSystem().exists(remoteWALDir)); + try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { + assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row")))); + assertDisallow(table, + t -> t.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))); + assertDisallow(table, t -> t.delete(new Delete(Bytes.toBytes("row")))); + assertDisallow(table, t -> t.incrementColumnValue(Bytes.toBytes("row"), CF, CQ, 1)); + assertDisallow(table, + t -> t.append(new Append(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))); + assertDisallow(table, + t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1"))))); + assertDisallow(table, + t -> t + .put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")), + new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1"))))); + assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row")) + .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))))); + } + writeAndVerifyReplication(0, 100); + } +} -- 2.7.4