From ed55e1b9c1465727e6888654b62e37a6a27589ac Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 15 Mar 2018 21:03:47 +0800 Subject: [PATCH] HBASE-20147 Serial replication will be stuck if we create a table with serial replication but add it to a peer after there are region moves --- .../org/apache/hadoop/hbase/MetaTableAccessor.java | 20 +- .../src/main/protobuf/MasterProcedure.proto | 1 + .../hbase/replication/ReplicationQueueStorage.java | 8 + .../hadoop/hbase/replication/ReplicationUtils.java | 6 +- .../replication/TableReplicationQueueStorage.java | 18 ++ .../replication/ZKReplicationQueueStorage.java | 49 +++-- .../hbase/master/replication/AddPeerProcedure.java | 9 +- .../master/replication/ReplicationPeerManager.java | 75 ++++++- .../replication/UpdatePeerConfigProcedure.java | 24 ++- .../regionserver/PeerProcedureHandlerImpl.java | 3 +- .../replication/SerialReplicationTestBase.java | 232 +++++++++++++++++++++ .../TestAddToSerialReplicationPeer.java | 150 +++++++++++++ .../hbase/replication/TestSerialReplication.java | 191 +---------------- 13 files changed, 565 insertions(+), 221 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 8858e71..cff0670 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -1168,8 +1168,9 @@ public class MetaTableAccessor { final List results = new ArrayList<>(); @Override public boolean visit(Result r) throws IOException { - if (r == null || r.isEmpty()) return true; - add(r); + if (r != null && !r.isEmpty()) { + add(r); + } return true; } @@ -2108,6 +2109,21 @@ public class MetaTableAccessor { } } + public static List getTablePrimaryRegionsAndLocations(Connection conn, + TableName tableName) throws IOException { + List list = new ArrayList<>(); + scanMeta(conn, getTableStartRowForMeta(tableName, QueryType.REGION), + getTableStopRowForMeta(tableName, QueryType.REGION), QueryType.REGION, r -> { + RegionLocations locs = getRegionLocations(r); + if (locs == null || locs.size() == 0) { + return true; + } + list.add(locs.getDefaultRegionLocation()); + return true; + }); + return list; + } + private static void debugLogMutations(List mutations) throws IOException { if (!METALOG.isDebugEnabled()) { return; diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 1134bd6..acc29ff 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -413,4 +413,5 @@ message AddPeerStateData { message UpdatePeerConfigStateData { required ReplicationPeer peer_config = 1; + optional ReplicationPeer old_peer_config = 2; } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java index 4c93da6..ba8c0be 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -79,6 +79,14 @@ public interface ReplicationQueueStorage { long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException; /** + * Set the max sequence id of a bunch of regions for a given peer. Will be called when setting up + * a serial replication peer. + * @param peerId peer id + * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication. + */ + void setLastSequenceIds(String peerId, Map lastSeqIds) throws ReplicationException; + + /** * Get the current position for a specific WAL in a given queue for a given regionserver. * @param serverName the name of the regionserver * @param queueId a String that identifies the queue diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index 2e86c17..8d216f6 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -120,13 +120,11 @@ public final class ReplicationUtils { return true; } - public static boolean isKeyConfigEqual(ReplicationPeerConfig rpc1, ReplicationPeerConfig rpc2) { + public static boolean isNamespacesAndTableCFsEqual(ReplicationPeerConfig rpc1, + ReplicationPeerConfig rpc2) { if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) { return false; } - if (rpc1.isSerial() != rpc2.isSerial()) { - return false; - } if (rpc1.replicateAllUserTables()) { return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) && isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap()); diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java index abb279d..e1f0cf1 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java @@ -198,6 +198,24 @@ public class TableReplicationQueueStorage extends TableReplicationStorageBase } } + + @Override + public void setLastSequenceIds(String peerId, Map lastSeqIds) + throws ReplicationException { + List puts = new ArrayList<>(); + for (Map.Entry e : lastSeqIds.entrySet()) { + Put regionPut = new Put(Bytes.toBytes(peerId)).addColumn(FAMILY_REGIONS, + getRegionQualifier(e.getKey()), Bytes.toBytes(e.getValue())); + puts.add(regionPut); + } + try (Table table = getReplicationMetaTable()) { + table.put(puts); + } catch (IOException e) { + throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId + + ", lastSeqIds.size=" + lastSeqIds.size(), e); + } + } + @Override public long getWALPosition(ServerName serverName, String queueId, String fileName) throws ReplicationException { diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index fa0ff0e..e8a8eff 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -189,6 +189,24 @@ public class ZKReplicationQueueStorage extends ZKReplicationStorageBase } } + private void addLastSeqIdsToOps(String queueId, Map lastSeqIds, + List listOfOps) throws KeeperException { + for (Entry lastSeqEntry : lastSeqIds.entrySet()) { + String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId); + String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); + /* + * Make sure the existence of path + * /hbase/replication/regions//-. As the javadoc in + * multiOrSequential() method said, if received a NodeExistsException, all operations will + * fail. So create the path here, and in fact, no need to add this operation to listOfOps, + * because only need to make sure that update file position and sequence id atomically. + */ + ZKUtil.createWithParents(zookeeper, path); + // Persist the max sequence id of region to zookeeper. + listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); + } + } + @Override public void setWALPosition(ServerName serverName, String queueId, String fileName, long position, Map lastSeqIds) throws ReplicationException { @@ -197,22 +215,8 @@ public class ZKReplicationQueueStorage extends ZKReplicationStorageBase listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName), ZKUtil.positionToByteArray(position))); // Persist the max sequence id(s) of regions for serial replication atomically. - if (lastSeqIds != null && lastSeqIds.size() > 0) { - for (Entry lastSeqEntry : lastSeqIds.entrySet()) { - String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId); - String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); - /* - * Make sure the existence of path - * /hbase/replication/regions//-. As the javadoc in - * multiOrSequential() method said, if received a NodeExistsException, all operations will - * fail. So create the path here, and in fact, no need to add this operation to listOfOps, - * because only need to make sure that update file position and sequence id atomically. - */ - ZKUtil.createWithParents(zookeeper, path); - // Persist the max sequence id of region to zookeeper. - listOfOps - .add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); - } + if (lastSeqIds != null && !lastSeqIds.isEmpty()) { + addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps); } ZKUtil.multiOrSequential(zookeeper, listOfOps, false); } catch (KeeperException e) { @@ -242,6 +246,19 @@ public class ZKReplicationQueueStorage extends ZKReplicationStorageBase } @Override + public void setLastSequenceIds(String peerId, Map lastSeqIds) + throws ReplicationException { + try { + List listOfOps = new ArrayList<>(); + addLastSeqIdsToOps(peerId, lastSeqIds, listOfOps); + ZKUtil.multiOrSequential(zookeeper, listOfOps, false); + } catch (KeeperException e) { + throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId + + ", lastSeqIds.size=" + lastSeqIds.size(), e); + } + } + + @Override public long getWALPosition(ServerName serverName, String queueId, String fileName) throws ReplicationException { byte[] bytes; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java index f0f7704..e8e9d5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -63,7 +63,7 @@ public class AddPeerProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preAddReplicationPeer(peerId, peerConfig); } - env.getReplicationPeerManager().preAddPeer(peerId, peerConfig); + env.getReplicationPeerManager().preAddPeer(peerId, peerConfig, enabled); } @Override @@ -72,7 +72,12 @@ public class AddPeerProcedure extends ModifyPeerProcedure { } @Override - protected void postPeerModification(MasterProcedureEnv env) throws IOException { + protected void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + if (peerConfig.isSerial()) { + env.getReplicationPeerManager().setLastSequenceIdForSerialPeer(peerId, peerConfig, null, + env.getMasterServices()); + } LOG.info("Successfully added {} peer {}, config {}", enabled ? "ENABLED" : "DISABLED", peerId, peerConfig); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 1e93373..9261c93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -28,11 +29,17 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -85,7 +92,7 @@ public class ReplicationPeerManager { } } - public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) + void preAddPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws DoNotRetryIOException, ReplicationException { if (peerId.contains("-")) { throw new DoNotRetryIOException("Found invalid peer name: " + peerId); @@ -99,6 +106,9 @@ public class ReplicationPeerManager { // have not been cleaned up yet then we should not create the new peer, otherwise the old wal // file may also be replicated. checkQueuesDeleted(peerId); + if (peerConfig.isSerial() && enabled) { + throw new DoNotRetryIOException("Can only add serial replication peer in DISABLED state"); + } } private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException { @@ -109,25 +119,28 @@ public class ReplicationPeerManager { return desc; } - public void preRemovePeer(String peerId) throws DoNotRetryIOException { + void preRemovePeer(String peerId) throws DoNotRetryIOException { checkPeerExists(peerId); } - public void preEnablePeer(String peerId) throws DoNotRetryIOException { + void preEnablePeer(String peerId) throws DoNotRetryIOException { ReplicationPeerDescription desc = checkPeerExists(peerId); if (desc.isEnabled()) { throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); } } - public void preDisablePeer(String peerId) throws DoNotRetryIOException { + void preDisablePeer(String peerId) throws DoNotRetryIOException { ReplicationPeerDescription desc = checkPeerExists(peerId); if (!desc.isEnabled()) { throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); } } - public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + /** + * Return the old peer config. Can never be null. + */ + ReplicationPeerConfig preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { checkPeerConfig(peerConfig); ReplicationPeerDescription desc = checkPeerExists(peerId); @@ -146,6 +159,17 @@ public class ReplicationPeerManager { oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); } + if (peerConfig.isSerial() && desc.isEnabled()) { + if (!oldPeerConfig.isSerial()) { + throw new DoNotRetryIOException( + "Can only change peer in DISABLED state to serial replication"); + } + if (!ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig)) { + throw new DoNotRetryIOException("Can only change namespace/table-cfs config in " + + "DISABLED state for serial replication peer"); + } + } + return oldPeerConfig; } public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) @@ -216,7 +240,7 @@ public class ReplicationPeerManager { return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); } - public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { + void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still // on-going when the refresh peer config procedure is done, if a RS which has already been // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in @@ -233,6 +257,45 @@ public class ReplicationPeerManager { queueStorage.removePeerFromHFileRefs(peerId); } + private List getTablesToSet(TableDescriptors tds, + ReplicationPeerConfig peerConfig, ReplicationPeerConfig oldPeerConfig) throws IOException { + Stream stream = + tds.getAll().values().stream().filter(TableDescriptor::hasGlobalReplicationScope) + .filter(td -> ReplicationUtils.contains(peerConfig, td.getTableName())); + // filter out the tables already in the peer config + if (oldPeerConfig != null && oldPeerConfig.isSerial()) { + stream = stream.filter(td -> !ReplicationUtils.contains(oldPeerConfig, td.getTableName())); + } + return stream.collect(Collectors.toList()); + } + + void setLastSequenceIdForSerialPeer(String peerId, ReplicationPeerConfig peerConfig, + ReplicationPeerConfig oldPeerConfig, MasterServices master) throws ReplicationException { + try { + List tds = + getTablesToSet(master.getTableDescriptors(), peerConfig, oldPeerConfig); + Map lastSeqIds = new HashMap(); + for (TableDescriptor td : tds) { + for (HRegionLocation loc : MetaTableAccessor + .getTablePrimaryRegionsAndLocations(master.getConnection(), td.getTableName())) { + // only update if the region has already been moved after born. + if (loc.getSeqNum() > 0) { + lastSeqIds.put(loc.getRegion().getEncodedName(), loc.getSeqNum() - 1); + if (lastSeqIds.size() >= 1000) { + queueStorage.setLastSequenceIds(peerId, lastSeqIds); + lastSeqIds.clear(); + } + } + } + } + if (!lastSeqIds.isEmpty()) { + queueStorage.setLastSequenceIds(peerId, lastSeqIds); + } + } catch (IOException e) { + throw new ReplicationException(e); + } + } + private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { checkClusterKey(peerConfig.getClusterKey()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java index 3497447..a38a50c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java @@ -40,6 +40,8 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { private ReplicationPeerConfig peerConfig; + private ReplicationPeerConfig oldPeerConfig; + public UpdatePeerConfigProcedure() { } @@ -59,7 +61,7 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig); } - env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig); + oldPeerConfig = env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig); } @Override @@ -68,7 +70,12 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { } @Override - protected void postPeerModification(MasterProcedureEnv env) throws IOException { + protected void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + if (peerConfig.isSerial()) { + env.getReplicationPeerManager().setLastSequenceIdForSerialPeer(peerId, peerConfig, + oldPeerConfig, env.getMasterServices()); + } LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { @@ -79,14 +86,19 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { @Override protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { super.serializeStateData(serializer); - serializer.serialize(UpdatePeerConfigStateData.newBuilder() - .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).build()); + UpdatePeerConfigStateData.Builder builder = UpdatePeerConfigStateData.newBuilder() + .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); + if (oldPeerConfig != null) { + builder.setOldPeerConfig(ReplicationPeerConfigUtil.convert(oldPeerConfig)); + } + serializer.serialize(builder.build()); } @Override protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { super.deserializeStateData(serializer); - peerConfig = ReplicationPeerConfigUtil - .convert(serializer.deserialize(UpdatePeerConfigStateData.class).getPeerConfig()); + UpdatePeerConfigStateData data = serializer.deserialize(UpdatePeerConfigStateData.class); + peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig()); + oldPeerConfig = ReplicationPeerConfigUtil.convert(data.getOldPeerConfig()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index a02d181..4b98bdb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -111,7 +111,8 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { ReplicationPeerConfig newConfig = replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId); // RS need to start work with the new replication config change - if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) { + if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig) || + oldConfig.isSerial() != newConfig.isSerial()) { replicationSourceManager.refreshSources(peerId); } success = true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java new file mode 100644 index 0000000..8bc8229 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.UUID; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestName; + +/** + * Base class for testing serial replication. + */ +public class SerialReplicationTestBase { + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + protected static String PEER_ID = "1"; + + protected static byte[] CF = Bytes.toBytes("CF"); + + protected static byte[] CQ = Bytes.toBytes("CQ"); + + protected static FileSystem FS; + + protected static Path LOG_DIR; + + protected static WALProvider.Writer WRITER; + + @Rule + public final TestName name = new TestName(); + + protected Path logPath; + + public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { + + private static final UUID PEER_UUID = UUID.randomUUID(); + + @Override + public UUID getPeerUUID() { + return PEER_UUID; + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + synchronized (WRITER) { + try { + for (Entry entry : replicateContext.getEntries()) { + WRITER.append(entry); + } + WRITER.sync(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return true; + } + + @Override + public void start() { + startAsync(); + } + + @Override + public void stop() { + stopAsync(); + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); + UTIL.startMiniCluster(3); + // disable balancer + UTIL.getAdmin().balancerSwitch(false, true); + LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); + FS = UTIL.getTestFileSystem(); + FS.mkdirs(LOG_DIR); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() throws Exception { + UTIL.getAdmin().removeReplicationPeer(PEER_ID); + rollAllWALs(); + if (WRITER != null) { + WRITER.close(); + WRITER = null; + } + } + + protected static void moveRegion(RegionInfo region, HRegionServer rs) throws Exception { + UTIL.getAdmin().move(region.getEncodedNameAsBytes(), + Bytes.toBytes(rs.getServerName().getServerName())); + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return rs.getRegion(region.getEncodedName()) != null; + } + + @Override + public String explainFailure() throws Exception { + return region + " is still not on " + rs; + } + }); + } + + protected static void rollAllWALs() throws Exception { + for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { + t.getRegionServer().getWalRoller().requestRollAll(); + } + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream() + .map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished); + } + + @Override + public String explainFailure() throws Exception { + return "Log roll has not finished yet"; + } + }); + } + + protected final void setupWALWriter() throws IOException { + logPath = new Path(LOG_DIR, name.getMethodName()); + WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); + } + + protected final void waitUntilReplicationDone(int expectedEntries) throws Exception { + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { + int count = 0; + while (reader.next() != null) { + count++; + } + return count >= expectedEntries; + } catch (IOException e) { + return false; + } + } + + @Override + public String explainFailure() throws Exception { + return "Not enough entries replicated"; + } + }); + } + + protected final void addPeer(boolean enabled) throws IOException { + UTIL.getAdmin().addReplicationPeer(PEER_ID, + ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") + .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true) + .build(), + false); + if (enabled) { + UTIL.getAdmin().enableReplicationPeer(PEER_ID); + } + } + + protected final void checkOrder(int expectedEntries) throws IOException { + try (WAL.Reader reader = + WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { + long seqId = -1L; + int count = 0; + for (Entry entry;;) { + entry = reader.next(); + if (entry == null) { + break; + } + assertTrue( + "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), + entry.getKey().getSequenceId() >= seqId); + count++; + } + assertEquals(expectedEntries, count); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java new file mode 100644 index 0000000..867e7ba --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Testcase for HBASE-20147. + */ +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAddToSerialReplicationPeer.class); + + @Before + public void setUp() throws IOException, StreamLacksCapabilityException { + setupWALWriter(); + } + + // make sure that we will start replication for the sequence id after move, that's what we want to + // test here. + private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer rs) throws Exception { + moveRegion(region, rs); + rollAllWALs(); + } + + private void waitUntilReplicatedToTheCurrentWAL(HRegionServer rs) throws Exception { + Path currentWAL = ((AbstractFSWAL) rs.getWAL(null)).getCurrentFileName(); + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + Path replicating = ((Replication) rs.getReplicationSourceService()).getReplicationManager() + .getSource(PEER_ID).getCurrentPath(); + return replicating.getName().equals(currentWAL.getName()); + } + + @Override + public String explainFailure() throws Exception { + return "Still not replicated to " + currentWAL + " yet"; + } + }); + + } + + @Test + public void testAddPeer() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + UTIL.getAdmin().createTable( + TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); + UTIL.waitTableAvailable(tableName); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); + HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); + moveRegionAndArchiveOldWals(region, rs); + addPeer(true); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + waitUntilReplicationDone(100); + checkOrder(100); + } + + @Ignore + @Test + public void testChangeToSerial() throws Exception { + ReplicationPeerConfig peerConfig = + ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") + .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(); + UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true); + + TableName tableName = TableName.valueOf(name.getMethodName()); + + UTIL.getAdmin().createTable( + TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); + UTIL.waitTableAvailable(tableName); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + + RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); + HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName); + HRegionServer rs = UTIL.getOtherRegionServer(srcRs); + moveRegionAndArchiveOldWals(region, rs); + waitUntilReplicationDone(100); + waitUntilReplicatedToTheCurrentWAL(srcRs); + + UTIL.getAdmin().disableReplicationPeer(PEER_ID); + UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, + ReplicationPeerConfig.newBuilder(peerConfig).setSerial(true).build()); + UTIL.getAdmin().enableReplicationPeer(PEER_ID); + + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + waitUntilReplicationDone(200); + checkOrder(200); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java index f8efcf0..94b79d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java @@ -23,211 +23,49 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; -import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALProvider; -import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -@Category({ ReplicationTests.class, LargeTests.class }) -public class TestSerialReplication { +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestSerialReplication extends SerialReplicationTestBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestSerialReplication.class); - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - - private static String PEER_ID = "1"; - - private static byte[] CF = Bytes.toBytes("CF"); - - private static byte[] CQ = Bytes.toBytes("CQ"); - - private static FileSystem FS; - - private static Path LOG_DIR; - - private static WALProvider.Writer WRITER; - - public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { - - private static final UUID PEER_UUID = UUID.randomUUID(); - - @Override - public UUID getPeerUUID() { - return PEER_UUID; - } - - @Override - public boolean replicate(ReplicateContext replicateContext) { - synchronized (WRITER) { - try { - for (Entry entry : replicateContext.getEntries()) { - WRITER.append(entry); - } - WRITER.sync(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - return true; - } - - @Override - public void start() { - startAsync(); - } - - @Override - public void stop() { - stopAsync(); - } - - @Override - protected void doStart() { - notifyStarted(); - } - - @Override - protected void doStop() { - notifyStopped(); - } - } - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); - UTIL.startMiniCluster(3); - // disable balancer - UTIL.getAdmin().balancerSwitch(false, true); - LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); - FS = UTIL.getTestFileSystem(); - FS.mkdirs(LOG_DIR); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - UTIL.shutdownMiniCluster(); - } - - @Rule - public final TestName name = new TestName(); - - private Path logPath; - @Before public void setUp() throws IOException, StreamLacksCapabilityException { - logPath = new Path(LOG_DIR, name.getMethodName()); - WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); + setupWALWriter(); // add in disable state, so later when enabling it all sources will start push together. - UTIL.getAdmin().addReplicationPeer(PEER_ID, - ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") - .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true) - .build(), - false); - } - - @After - public void tearDown() throws Exception { - UTIL.getAdmin().removeReplicationPeer(PEER_ID); - for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { - t.getRegionServer().getWalRoller().requestRollAll(); - } - UTIL.waitFor(30000, new ExplainingPredicate() { - - @Override - public boolean evaluate() throws Exception { - return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream() - .map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished); - } - - @Override - public String explainFailure() throws Exception { - return "Log roll has not finished yet"; - } - }); - for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { - t.getRegionServer().getWalRoller().requestRollAll(); - } - if (WRITER != null) { - WRITER.close(); - WRITER = null; - } - } - - private void moveRegion(RegionInfo region, HRegionServer rs) throws Exception { - UTIL.getAdmin().move(region.getEncodedNameAsBytes(), - Bytes.toBytes(rs.getServerName().getServerName())); - UTIL.waitFor(30000, new ExplainingPredicate() { - - @Override - public boolean evaluate() throws Exception { - return rs.getRegion(region.getEncodedName()) != null; - } - - @Override - public String explainFailure() throws Exception { - return region + " is still not on " + rs; - } - }); + addPeer(false); } private void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception { UTIL.getAdmin().enableReplicationPeer(PEER_ID); - UTIL.waitFor(30000, new ExplainingPredicate() { - - @Override - public boolean evaluate() throws Exception { - try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { - int count = 0; - while (reader.next() != null) { - count++; - } - return count >= expectedEntries; - } catch (IOException e) { - return false; - } - } - - @Override - public String explainFailure() throws Exception { - return "Not enough entries replicated"; - } - }); + waitUntilReplicationDone(expectedEntries); } @Test @@ -251,22 +89,7 @@ public class TestSerialReplication { } } enablePeerAndWaitUntilReplicationDone(200); - try (WAL.Reader reader = - WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { - long seqId = -1L; - int count = 0; - for (Entry entry;;) { - entry = reader.next(); - if (entry == null) { - break; - } - assertTrue( - "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), - entry.getKey().getSequenceId() >= seqId); - count++; - } - assertEquals(200, count); - } + checkOrder(200); } @Test -- 2.7.4